diff --git a/src/net.cpp b/src/net.cpp index 16591461efb..7b40e019fad 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -3139,12 +3139,12 @@ void CConnman::ThreadMessageHandler() continue; // Receive messages - bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc); + bool fMoreNodeWork{m_msgproc->ProcessMessages(*pnode, flagInterruptMsgProc)}; fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); if (flagInterruptMsgProc) return; // Send messages - m_msgproc->SendMessages(pnode); + m_msgproc->SendMessages(*pnode); if (flagInterruptMsgProc) return; diff --git a/src/net.h b/src/net.h index 1e4e9124e9f..1f7bbe4a500 100644 --- a/src/net.h +++ b/src/net.h @@ -1043,21 +1043,21 @@ public: virtual bool HasAllDesirableServiceFlags(ServiceFlags services) const = 0; /** - * Process protocol messages received from a given node - * - * @param[in] pnode The node which we have received messages from. - * @param[in] interrupt Interrupt condition for processing threads - * @return True if there is more work to be done - */ - virtual bool ProcessMessages(CNode* pnode, std::atomic& interrupt) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0; + * Process protocol messages received from a given node + * + * @param[in] node The node which we have received messages from. + * @param[in] interrupt Interrupt condition for processing threads + * @return True if there is more work to be done + */ + virtual bool ProcessMessages(CNode& node, std::atomic& interrupt) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0; /** - * Send queued protocol messages to a given node. - * - * @param[in] pnode The node which we are sending messages to. - * @return True if there is more work to be done - */ - virtual bool SendMessages(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0; + * Send queued protocol messages to a given node. + * + * @param[in] node The node which we are sending messages to. + * @return True if there is more work to be done + */ + virtual bool SendMessages(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0; protected: diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 7da26174a76..e5b4bc7772d 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -529,9 +529,9 @@ public: void InitializeNode(const CNode& node, ServiceFlags our_services) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_tx_download_mutex); void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex, !m_tx_download_mutex); bool HasAllDesirableServiceFlags(ServiceFlags services) const override; - bool ProcessMessages(CNode* pfrom, std::atomic& interrupt) override + bool ProcessMessages(CNode& node, std::atomic& interrupt) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex, !m_tx_download_mutex); - bool SendMessages(CNode* pto) override + bool SendMessages(CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_most_recent_block_mutex, g_msgproc_mutex, !m_tx_download_mutex); /** Implement PeerManager */ @@ -551,13 +551,14 @@ public: m_best_block_time = time; }; void UnitTestMisbehaving(NodeId peer_id) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex) { Misbehaving(*Assert(GetPeerRef(peer_id)), ""); }; - void ProcessMessage(CNode& pfrom, const std::string& msg_type, DataStream& vRecv, - std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) override - EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex, !m_tx_download_mutex); void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) override; ServiceFlags GetDesirableServiceFlags(ServiceFlags services) const override; private: + void ProcessMessage(Peer& peer, CNode& pfrom, const std::string& msg_type, DataStream& vRecv, std::chrono::microseconds time_received, + const std::atomic& interruptMsgProc) + EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex, !m_tx_download_mutex); + /** Consider evicting an outbound peer based on the amount of time they've been behind our tip */ void ConsiderEviction(CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_msgproc_mutex); @@ -3545,7 +3546,7 @@ void PeerManagerImpl::PushPrivateBroadcastTx(CNode& node) MakeAndPushMessage(node, NetMsgType::INV, std::vector{{CInv{MSG_TX, tx->GetHash().ToUint256()}}}); } -void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, DataStream& vRecv, +void PeerManagerImpl::ProcessMessage(Peer& peer, CNode& pfrom, const std::string& msg_type, DataStream& vRecv, const std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) { @@ -3553,8 +3554,6 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, LogDebug(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(msg_type), vRecv.size(), pfrom.GetId()); - PeerRef peer = GetPeerRef(pfrom.GetId()); - if (peer == nullptr) return; if (msg_type == NetMsgType::VERSION) { if (pfrom.nVersion != 0) { @@ -3635,7 +3634,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // Inbound peers send us their version message when they connect. // We send our version message in response. if (pfrom.IsInboundConn()) { - PushNodeVersion(pfrom, *peer); + PushNodeVersion(pfrom, peer); } // Change version @@ -3644,13 +3643,13 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, pfrom.nVersion = nVersion; pfrom.m_has_all_wanted_services = HasAllDesirableServiceFlags(nServices); - peer->m_their_services = nServices; + peer.m_their_services = nServices; pfrom.SetAddrLocal(addrMe); { LOCK(pfrom.m_subver_mutex); pfrom.cleanSubVer = cleanSubVer; } - peer->m_starting_height = starting_height; + peer.m_starting_height = starting_height; // Only initialize the Peer::TxRelay m_relay_txs data structure if: // - this isn't an outbound block-relay-only connection, and @@ -3660,8 +3659,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // the peer may turn on transaction relay later. if (!pfrom.IsBlockOnlyConn() && !pfrom.IsFeelerConn() && - (fRelay || (peer->m_our_services & NODE_BLOOM))) { - auto* const tx_relay = peer->SetTxRelay(); + (fRelay || (peer.m_our_services & NODE_BLOOM))) { + auto* const tx_relay = peer.SetTxRelay(); { LOCK(tx_relay->m_bloom_filter_mutex); tx_relay->m_relay_txs = fRelay; // set to true after we get the first filter* message @@ -3672,7 +3671,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, const auto mapped_as{m_connman.GetMappedAS(pfrom.addr)}; LogDebug(BCLog::NET, "receive version message: %s: version %d, blocks=%d, us=%s, txrelay=%d, peer=%d%s%s\n", cleanSubVer, pfrom.nVersion, - peer->m_starting_height, addrMe.ToStringAddrPort(), fRelay, pfrom.GetId(), + peer.m_starting_height, addrMe.ToStringAddrPort(), fRelay, pfrom.GetId(), pfrom.LogIP(fLogIPs), (mapped_as ? strprintf(", mapped_as=%d", mapped_as) : "")); if (pfrom.IsPrivateBroadcastConn()) { @@ -3706,7 +3705,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // - this is not a block-relay-only connection and not a feeler // - this is not an addr fetch connection; // - we are not in -blocksonly mode. - const auto* tx_relay = peer->GetTxRelay(); + const auto* tx_relay = peer.GetTxRelay(); if (tx_relay && WITH_LOCK(tx_relay->m_bloom_filter_mutex, return tx_relay->m_relay_txs) && !pfrom.IsAddrFetchConn() && !m_opts.ignore_incoming_txs) { const uint64_t recon_salt = m_txreconciliation->PreRegisterPeer(pfrom.GetId()); @@ -3721,7 +3720,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, { LOCK(cs_main); CNodeState* state = State(pfrom.GetId()); - state->fPreferredDownload = (!pfrom.IsInboundConn() || pfrom.HasPermission(NetPermissionFlags::NoBan)) && !pfrom.IsAddrFetchConn() && CanServeBlocks(*peer); + state->fPreferredDownload = (!pfrom.IsInboundConn() || pfrom.HasPermission(NetPermissionFlags::NoBan)) && !pfrom.IsAddrFetchConn() && CanServeBlocks(peer); m_num_preferred_download_peers += state->fPreferredDownload; } @@ -3730,7 +3729,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // inbound or outbound block-relay-only peers. bool send_getaddr{false}; if (!pfrom.IsInboundConn()) { - send_getaddr = SetupAddressRelay(pfrom, *peer); + send_getaddr = SetupAddressRelay(pfrom, peer); } if (send_getaddr) { // Do a one-time address fetch to help populate/update our addrman. @@ -3740,10 +3739,10 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // potentially leaking addr information and we do not want to // indicate to the peer that we will participate in addr relay. MakeAndPushMessage(pfrom, NetMsgType::GETADDR); - peer->m_getaddr_sent = true; + peer.m_getaddr_sent = true; // When requesting a getaddr, accept an additional MAX_ADDR_TO_SEND addresses in response // (bypassing the MAX_ADDR_PROCESSING_TOKEN_BUCKET limit). - peer->m_addr_token_bucket += MAX_ADDR_TO_SEND; + peer.m_addr_token_bucket += MAX_ADDR_TO_SEND; } if (!pfrom.IsInboundConn()) { @@ -3764,11 +3763,11 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, m_addrman.Good(pfrom.addr); } - peer->m_time_offset = NodeSeconds{std::chrono::seconds{nTime}} - Now(); + peer.m_time_offset = NodeSeconds{std::chrono::seconds{nTime}} - Now(); if (!pfrom.IsInboundConn()) { // Don't use timedata samples from inbound peers to make it // harder for others to create false warnings about our clock being out of sync. - m_outbound_time_offsets.Add(peer->m_time_offset); + m_outbound_time_offsets.Add(peer.m_time_offset); m_outbound_time_offsets.WarnIfOutOfSync(); } @@ -3803,7 +3802,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, return strprintf("New %s peer connected: transport: %s, version: %d, blocks=%d peer=%d%s%s\n", pfrom.ConnectionTypeAsString(), TransportTypeAsString(pfrom.m_transport->GetInfo().transport_type), - pfrom.nVersion.load(), peer->m_starting_height, + pfrom.nVersion.load(), peer.m_starting_height, pfrom.GetId(), pfrom.LogIP(fLogIPs), (mapped_as ? strprintf(", mapped_as=%d", mapped_as) : "")); }; @@ -3816,7 +3815,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, LogInfo("%s", new_peer_msg()); } - if (auto tx_relay = peer->GetTxRelay()) { + if (auto tx_relay = peer.GetTxRelay()) { // `TxRelay::m_tx_inventory_to_send` must be empty before the // version handshake is completed as // `TxRelay::m_next_inv_send_time` is first initialised in @@ -3851,7 +3850,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } if (m_txreconciliation) { - if (!peer->m_wtxid_relay || !m_txreconciliation->IsPeerRegistered(pfrom.GetId())) { + if (!peer.m_wtxid_relay || !m_txreconciliation->IsPeerRegistered(pfrom.GetId())) { // We could have optimistically pre-registered/registered the peer. In that case, // we should forget about the reconciliation state here if this wasn't followed // by WTXIDRELAY (since WTXIDRELAY can't be announced later). @@ -3865,7 +3864,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, m_txdownloadman.ConnectedPeer(pfrom.GetId(), node::TxDownloadConnectionInfo { .m_preferred = state->fPreferredDownload, .m_relay_permissions = pfrom.HasPermission(NetPermissionFlags::Relay), - .m_wtxid_relay = peer->m_wtxid_relay, + .m_wtxid_relay = peer.m_wtxid_relay, }); } @@ -3874,7 +3873,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } if (msg_type == NetMsgType::SENDHEADERS) { - peer->m_prefers_headers = true; + peer.m_prefers_headers = true; return; } @@ -3906,8 +3905,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, return; } if (pfrom.GetCommonVersion() >= WTXID_RELAY_VERSION) { - if (!peer->m_wtxid_relay) { - peer->m_wtxid_relay = true; + if (!peer.m_wtxid_relay) { + peer.m_wtxid_relay = true; m_wtxid_relay_peers++; } else { LogDebug(BCLog::NET, "ignoring duplicate wtxidrelay from peer=%d\n", pfrom.GetId()); @@ -3927,7 +3926,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, pfrom.fDisconnect = true; return; } - peer->m_wants_addrv2 = true; + peer.m_wants_addrv2 = true; return; } @@ -3956,7 +3955,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // Peer must not offer us reconciliations if they specified no tx relay support in VERSION. // This flag might also be false in other cases, but the RejectIncomingTxs check above // eliminates them, so that this flag fully represents what we are looking for. - const auto* tx_relay = peer->GetTxRelay(); + const auto* tx_relay = peer.GetTxRelay(); if (!tx_relay || !WITH_LOCK(tx_relay->m_bloom_filter_mutex, return tx_relay->m_relay_txs)) { LogDebug(BCLog::NET, "sendtxrcncl received which indicated no tx relay to us, %s\n", pfrom.DisconnectMsg(fLogIPs)); pfrom.fDisconnect = true; @@ -4012,14 +4011,14 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, vRecv >> ser_params(vAddr); - if (!SetupAddressRelay(pfrom, *peer)) { + if (!SetupAddressRelay(pfrom, peer)) { LogDebug(BCLog::NET, "ignoring %s message from %s peer=%d\n", msg_type, pfrom.ConnectionTypeAsString(), pfrom.GetId()); return; } if (vAddr.size() > MAX_ADDR_TO_SEND) { - Misbehaving(*peer, strprintf("%s message size = %u", msg_type, vAddr.size())); + Misbehaving(peer, strprintf("%s message size = %u", msg_type, vAddr.size())); return; } @@ -4029,13 +4028,13 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // Update/increment addr rate limiting bucket. const auto current_time{GetTime()}; - if (peer->m_addr_token_bucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) { + if (peer.m_addr_token_bucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) { // Don't increment bucket if it's already full - const auto time_diff = std::max(current_time - peer->m_addr_token_timestamp, 0us); + const auto time_diff = std::max(current_time - peer.m_addr_token_timestamp, 0us); const double increment = Ticks(time_diff) * MAX_ADDR_RATE_PER_SECOND; - peer->m_addr_token_bucket = std::min(peer->m_addr_token_bucket + increment, MAX_ADDR_PROCESSING_TOKEN_BUCKET); + peer.m_addr_token_bucket = std::min(peer.m_addr_token_bucket + increment, MAX_ADDR_PROCESSING_TOKEN_BUCKET); } - peer->m_addr_token_timestamp = current_time; + peer.m_addr_token_timestamp = current_time; const bool rate_limited = !pfrom.HasPermission(NetPermissionFlags::Addr); uint64_t num_proc = 0; @@ -4047,13 +4046,13 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, return; // Apply rate limiting. - if (peer->m_addr_token_bucket < 1.0) { + if (peer.m_addr_token_bucket < 1.0) { if (rate_limited) { ++num_rate_limit; continue; } } else { - peer->m_addr_token_bucket -= 1.0; + peer.m_addr_token_bucket -= 1.0; } // We only bother storing full nodes, though this may include // things which we would not make an outbound connection to, in @@ -4064,14 +4063,14 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, if (addr.nTime <= NodeSeconds{100000000s} || addr.nTime > current_a_time + 10min) { addr.nTime = current_a_time - 5 * 24h; } - AddAddressKnown(*peer, addr); + AddAddressKnown(peer, addr); if (m_banman && (m_banman->IsDiscouraged(addr) || m_banman->IsBanned(addr))) { // Do not process banned/discouraged addresses beyond remembering we received them continue; } ++num_proc; const bool reachable{g_reachable_nets.Contains(addr)}; - if (addr.nTime > current_a_time - 10min && !peer->m_getaddr_sent && vAddr.size() <= 10 && addr.IsRoutable()) { + if (addr.nTime > current_a_time - 10min && !peer.m_getaddr_sent && vAddr.size() <= 10 && addr.IsRoutable()) { // Relay to a limited number of other nodes RelayAddress(pfrom.GetId(), addr, reachable); } @@ -4080,13 +4079,13 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, vAddrOk.push_back(addr); } } - peer->m_addr_processed += num_proc; - peer->m_addr_rate_limited += num_rate_limit; + peer.m_addr_processed += num_proc; + peer.m_addr_rate_limited += num_rate_limit; LogDebug(BCLog::NET, "Received addr: %u addresses (%u processed, %u rate-limited) from peer=%d\n", vAddr.size(), num_proc, num_rate_limit, pfrom.GetId()); - m_addrman.Add(vAddrOk, pfrom.addr, 2h); - if (vAddr.size() < 1000) peer->m_getaddr_sent = false; + m_addrman.Add(vAddrOk, pfrom.addr, /*time_penalty=*/2h); + if (vAddr.size() < 1000) peer.m_getaddr_sent = false; // AddrFetch: Require multiple addresses to avoid disconnecting on self-announcements if (pfrom.IsAddrFetchConn() && vAddr.size() > 1) { @@ -4101,7 +4100,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, vRecv >> vInv; if (vInv.size() > MAX_INV_SZ) { - Misbehaving(*peer, strprintf("inv message size = %u", vInv.size())); + Misbehaving(peer, strprintf("inv message size = %u", vInv.size())); return; } @@ -4118,7 +4117,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // Ignore INVs that don't match wtxidrelay setting. // Note that orphan parent fetching always uses MSG_TX GETDATAs regardless of the wtxidrelay setting. // This is fine as no INV messages are involved in that process. - if (peer->m_wtxid_relay) { + if (peer.m_wtxid_relay) { if (inv.IsMsgTx()) continue; } else { if (inv.IsMsgWtx()) continue; @@ -4145,7 +4144,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, return; } const GenTxid gtxid = ToGenTxid(inv); - AddKnownTx(*peer, inv.hash); + AddKnownTx(peer, inv.hash); if (!m_chainman.IsInitialBlockDownload()) { const bool fAlreadyHave{m_txdownloadman.AddTxAnnouncement(pfrom.GetId(), gtxid, current_time)}; @@ -4168,14 +4167,14 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // our initial peer is unresponsive (but less bandwidth than we'd // use if we turned on sync with all peers). CNodeState& state{*Assert(State(pfrom.GetId()))}; - if (state.fSyncStarted || (!peer->m_inv_triggered_getheaders_before_sync && *best_block != m_last_block_inv_triggering_headers_sync)) { - if (MaybeSendGetHeaders(pfrom, GetLocator(m_chainman.m_best_header), *peer)) { + if (state.fSyncStarted || (!peer.m_inv_triggered_getheaders_before_sync && *best_block != m_last_block_inv_triggering_headers_sync)) { + if (MaybeSendGetHeaders(pfrom, GetLocator(m_chainman.m_best_header), peer)) { LogDebug(BCLog::NET, "getheaders (%d) %s to peer=%d\n", m_chainman.m_best_header->nHeight, best_block->ToString(), pfrom.GetId()); } if (!state.fSyncStarted) { - peer->m_inv_triggered_getheaders_before_sync = true; + peer.m_inv_triggered_getheaders_before_sync = true; // Update the last block hash that triggered a new headers // sync, so that we don't turn on headers sync with more // than 1 new peer every new block. @@ -4192,7 +4191,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, vRecv >> vInv; if (vInv.size() > MAX_INV_SZ) { - Misbehaving(*peer, strprintf("getdata message size = %u", vInv.size())); + Misbehaving(peer, strprintf("getdata message size = %u", vInv.size())); return; } @@ -4219,8 +4218,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, MakeAndPushMessage(pfrom, NetMsgType::TX, TX_WITH_WITNESS(*pushed_tx)); - peer->m_ping_queued = true; // Ensure a ping will be sent: mimic a request via RPC. - MaybeSendPing(pfrom, *peer, GetTime()); + peer.m_ping_queued = true; // Ensure a ping will be sent: mimic a request via RPC. + MaybeSendPing(pfrom, peer, GetTime()); } else { LogDebug(BCLog::PRIVBROADCAST, "Disconnecting: got an unexpected GETDATA message, peer=%d%s", pfrom.GetId(), fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToStringAddrPort()) : ""); @@ -4230,9 +4229,9 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } { - LOCK(peer->m_getdata_requests_mutex); - peer->m_getdata_requests.insert(peer->m_getdata_requests.end(), vInv.begin(), vInv.end()); - ProcessGetData(pfrom, *peer, interruptMsgProc); + LOCK(peer.m_getdata_requests_mutex); + peer.m_getdata_requests.insert(peer.m_getdata_requests.end(), vInv.begin(), vInv.end()); + ProcessGetData(pfrom, peer, interruptMsgProc); } return; @@ -4292,12 +4291,12 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, LogDebug(BCLog::NET, " getblocks stopping, pruned or too old block at %d %s\n", pindex->nHeight, pindex->GetBlockHash().ToString()); break; } - WITH_LOCK(peer->m_block_inv_mutex, peer->m_blocks_for_inv_relay.push_back(pindex->GetBlockHash())); + WITH_LOCK(peer.m_block_inv_mutex, peer.m_blocks_for_inv_relay.push_back(pindex->GetBlockHash())); if (--nLimit <= 0) { // When this block is requested, we'll send an inv that'll // trigger the peer to getblocks the next batch of inventory. LogDebug(BCLog::NET, " getblocks stopping at limit %d %s\n", pindex->nHeight, pindex->GetBlockHash().ToString()); - WITH_LOCK(peer->m_block_inv_mutex, {peer->m_continuation_block = pindex->GetBlockHash();}); + WITH_LOCK(peer.m_block_inv_mutex, {peer.m_continuation_block = pindex->GetBlockHash();}); break; } } @@ -4321,7 +4320,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // Unlock m_most_recent_block_mutex to avoid cs_main lock inversion } if (recent_block) { - SendBlockTransactions(pfrom, *peer, *recent_block, req); + SendBlockTransactions(pfrom, peer, *recent_block, req); return; } @@ -4347,7 +4346,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // pruned after we release cs_main above, so this read should never fail. assert(ret); - SendBlockTransactions(pfrom, *peer, block, req); + SendBlockTransactions(pfrom, peer, block, req); return; } @@ -4360,7 +4359,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // actually receive all the data read from disk over the network. LogDebug(BCLog::NET, "Peer %d sent us a getblocktxn for a block > %i deep\n", pfrom.GetId(), MAX_BLOCKTXN_DEPTH); CInv inv{MSG_WITNESS_BLOCK, req.blockhash}; - WITH_LOCK(peer->m_getdata_requests_mutex, peer->m_getdata_requests.push_back(inv)); + WITH_LOCK(peer.m_getdata_requests_mutex, peer.m_getdata_requests.push_back(inv)); // The message processing loop will go around again (without pausing) and we'll respond then return; } @@ -4462,8 +4461,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, const Txid& txid = ptx->GetHash(); const Wtxid& wtxid = ptx->GetWitnessHash(); - const uint256& hash = peer->m_wtxid_relay ? wtxid.ToUint256() : txid.ToUint256(); - AddKnownTx(*peer, hash); + const uint256& hash = peer.m_wtxid_relay ? wtxid.ToUint256() : txid.ToUint256(); + AddKnownTx(peer, hash); if (const auto num_broadcasted{m_tx_for_private_broadcast.Remove(ptx)}) { LogDebug(BCLog::PRIVBROADCAST, "Received our privately broadcast transaction (txid=%s) from the " @@ -4546,7 +4545,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, if (!prev_block) { // Doesn't connect (or is genesis), instead of DoSing in AcceptBlockHeader, request deeper headers if (!m_chainman.IsInitialBlockDownload()) { - MaybeSendGetHeaders(pfrom, GetLocator(m_chainman.m_best_header), *peer); + MaybeSendGetHeaders(pfrom, GetLocator(m_chainman.m_best_header), peer); } return; } else if (prev_block->nChainWork + GetBlockProof(cmpctblock.header) < GetAntiDoSWorkThreshold()) { @@ -4622,7 +4621,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // We requested this block for some reason, but our mempool will probably be useless // so we just grab the block via normal getdata std::vector vInv(1); - vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(*peer), blockhash); + vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(peer), blockhash); MakeAndPushMessage(pfrom, NetMsgType::GETDATA, vInv); } return; @@ -4653,13 +4652,13 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, ReadStatus status = partialBlock.InitData(cmpctblock, vExtraTxnForCompact); if (status == READ_STATUS_INVALID) { RemoveBlockRequest(pindex->GetBlockHash(), pfrom.GetId()); // Reset in-flight state in case Misbehaving does not result in a disconnect - Misbehaving(*peer, "invalid compact block"); + Misbehaving(peer, "invalid compact block"); return; } else if (status == READ_STATUS_FAILED) { if (first_in_flight) { // Duplicate txindexes, the block is now in-flight, so just request it std::vector vInv(1); - vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(*peer), blockhash); + vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(peer), blockhash); MakeAndPushMessage(pfrom, NetMsgType::GETDATA, vInv); } else { // Give up for this peer and wait for other peer(s) @@ -4719,7 +4718,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // We requested this block, but its far into the future, so our // mempool will probably be useless - request the block normally std::vector vInv(1); - vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(*peer), blockhash); + vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(peer), blockhash); MakeAndPushMessage(pfrom, NetMsgType::GETDATA, vInv); return; } else { @@ -4732,7 +4731,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, if (fProcessBLOCKTXN) { BlockTransactions txn; txn.blockhash = blockhash; - return ProcessCompactBlockTxns(pfrom, *peer, txn); + return ProcessCompactBlockTxns(pfrom, peer, txn); } if (fRevertToHeaderProcessing) { @@ -4741,7 +4740,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // the peer if the header turns out to be for an invalid block. // Note that if a peer tries to build on an invalid chain, that // will be detected and the peer will be disconnected/discouraged. - return ProcessHeadersMessage(pfrom, *peer, {cmpctblock.header}, /*via_compact_block=*/true); + return ProcessHeadersMessage(pfrom, peer, {cmpctblock.header}, /*via_compact_block=*/true); } if (fBlockReconstructed) { @@ -4784,7 +4783,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, BlockTransactions resp; vRecv >> resp; - return ProcessCompactBlockTxns(pfrom, *peer, resp); + return ProcessCompactBlockTxns(pfrom, peer, resp); } if (msg_type == NetMsgType::HEADERS) @@ -4800,7 +4799,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // Bypass the normal CBlock deserialization, as we don't want to risk deserializing 2000 full blocks. unsigned int nCount = ReadCompactSize(vRecv); if (nCount > m_opts.max_headers_result) { - Misbehaving(*peer, strprintf("headers message size = %u", nCount)); + Misbehaving(peer, strprintf("headers message size = %u", nCount)); return; } headers.resize(nCount); @@ -4809,7 +4808,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, ReadCompactSize(vRecv); // ignore tx count; assume it is 0. } - ProcessHeadersMessage(pfrom, *peer, std::move(headers), /*via_compact_block=*/false); + ProcessHeadersMessage(pfrom, peer, std::move(headers), /*via_compact_block=*/false); // Check if the headers presync progress needs to be reported to validation. // This needs to be done without holding the m_headers_presync_mutex lock. @@ -4846,9 +4845,9 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // Check for possible mutation if it connects to something we know so we can check for DEPLOYMENT_SEGWIT being active if (prev_block && IsBlockMutated(/*block=*/*pblock, /*check_witness_root=*/DeploymentActiveAfter(prev_block, m_chainman, Consensus::DEPLOYMENT_SEGWIT))) { - LogDebug(BCLog::NET, "Received mutated block from peer=%d\n", peer->m_id); - Misbehaving(*peer, "mutated block"); - WITH_LOCK(cs_main, RemoveBlockRequest(pblock->GetHash(), peer->m_id)); + LogDebug(BCLog::NET, "Received mutated block from peer=%d\n", peer.m_id); + Misbehaving(peer, "mutated block"); + WITH_LOCK(cs_main, RemoveBlockRequest(pblock->GetHash(), peer.m_id)); return; } @@ -4888,17 +4887,17 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, // Since this must be an inbound connection, SetupAddressRelay will // never fail. - Assume(SetupAddressRelay(pfrom, *peer)); + Assume(SetupAddressRelay(pfrom, peer)); // Only send one GetAddr response per connection to reduce resource waste // and discourage addr stamping of INV announcements. - if (peer->m_getaddr_recvd) { + if (peer.m_getaddr_recvd) { LogDebug(BCLog::NET, "Ignoring repeated \"getaddr\". peer=%d\n", pfrom.GetId()); return; } - peer->m_getaddr_recvd = true; + peer.m_getaddr_recvd = true; - peer->m_addrs_to_send.clear(); + peer.m_addrs_to_send.clear(); std::vector vAddr; if (pfrom.HasPermission(NetPermissionFlags::Addr)) { vAddr = m_connman.GetAddressesUnsafe(MAX_ADDR_TO_SEND, MAX_PCT_ADDR_TO_SEND, /*network=*/std::nullopt); @@ -4906,7 +4905,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, vAddr = m_connman.GetAddresses(pfrom, MAX_ADDR_TO_SEND, MAX_PCT_ADDR_TO_SEND); } for (const CAddress &addr : vAddr) { - PushAddress(*peer, addr); + PushAddress(peer, addr); } return; } @@ -4914,7 +4913,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, if (msg_type == NetMsgType::MEMPOOL) { // Only process received mempool messages if we advertise NODE_BLOOM // or if the peer has mempool permissions. - if (!(peer->m_our_services & NODE_BLOOM) && !pfrom.HasPermission(NetPermissionFlags::Mempool)) + if (!(peer.m_our_services & NODE_BLOOM) && !pfrom.HasPermission(NetPermissionFlags::Mempool)) { if (!pfrom.HasPermission(NetPermissionFlags::NoBan)) { @@ -4934,7 +4933,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, return; } - if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) { + if (auto tx_relay = peer.GetTxRelay(); tx_relay != nullptr) { LOCK(tx_relay->m_tx_inventory_mutex); tx_relay->m_send_mempool = true; } @@ -4972,11 +4971,11 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, vRecv >> nonce; // Only process pong message if there is an outstanding ping (old ping without nonce should never pong) - if (peer->m_ping_nonce_sent != 0) { - if (nonce == peer->m_ping_nonce_sent) { + if (peer.m_ping_nonce_sent != 0) { + if (nonce == peer.m_ping_nonce_sent) { // Matching pong received, this ping is no longer outstanding bPingFinished = true; - const auto ping_time = ping_end - peer->m_ping_start.load(); + const auto ping_time = ping_end - peer.m_ping_start.load(); if (ping_time.count() >= 0) { // Let connman know about this successful ping-pong pfrom.PongReceived(ping_time); @@ -5012,18 +5011,18 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, LogDebug(BCLog::NET, "pong peer=%d: %s, %x expected, %x received, %u bytes\n", pfrom.GetId(), sProblem, - peer->m_ping_nonce_sent, + peer.m_ping_nonce_sent, nonce, nAvail); } if (bPingFinished) { - peer->m_ping_nonce_sent = 0; + peer.m_ping_nonce_sent = 0; } return; } if (msg_type == NetMsgType::FILTERLOAD) { - if (!(peer->m_our_services & NODE_BLOOM)) { + if (!(peer.m_our_services & NODE_BLOOM)) { LogDebug(BCLog::NET, "filterload received despite not offering bloom services, %s\n", pfrom.DisconnectMsg(fLogIPs)); pfrom.fDisconnect = true; return; @@ -5034,8 +5033,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, if (!filter.IsWithinSizeConstraints()) { // There is no excuse for sending a too-large filter - Misbehaving(*peer, "too-large bloom filter"); - } else if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) { + Misbehaving(peer, "too-large bloom filter"); + } else if (auto tx_relay = peer.GetTxRelay(); tx_relay != nullptr) { { LOCK(tx_relay->m_bloom_filter_mutex); tx_relay->m_bloom_filter.reset(new CBloomFilter(filter)); @@ -5048,7 +5047,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } if (msg_type == NetMsgType::FILTERADD) { - if (!(peer->m_our_services & NODE_BLOOM)) { + if (!(peer.m_our_services & NODE_BLOOM)) { LogDebug(BCLog::NET, "filteradd received despite not offering bloom services, %s\n", pfrom.DisconnectMsg(fLogIPs)); pfrom.fDisconnect = true; return; @@ -5061,7 +5060,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, bool bad = false; if (vData.size() > MAX_SCRIPT_ELEMENT_SIZE) { bad = true; - } else if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) { + } else if (auto tx_relay = peer.GetTxRelay(); tx_relay != nullptr) { LOCK(tx_relay->m_bloom_filter_mutex); if (tx_relay->m_bloom_filter) { tx_relay->m_bloom_filter->insert(vData); @@ -5070,18 +5069,18 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } } if (bad) { - Misbehaving(*peer, "bad filteradd message"); + Misbehaving(peer, "bad filteradd message"); } return; } if (msg_type == NetMsgType::FILTERCLEAR) { - if (!(peer->m_our_services & NODE_BLOOM)) { + if (!(peer.m_our_services & NODE_BLOOM)) { LogDebug(BCLog::NET, "filterclear received despite not offering bloom services, %s\n", pfrom.DisconnectMsg(fLogIPs)); pfrom.fDisconnect = true; return; } - auto tx_relay = peer->GetTxRelay(); + auto tx_relay = peer.GetTxRelay(); if (!tx_relay) return; { @@ -5098,7 +5097,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, CAmount newFeeFilter = 0; vRecv >> newFeeFilter; if (MoneyRange(newFeeFilter)) { - if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) { + if (auto tx_relay = peer.GetTxRelay(); tx_relay != nullptr) { tx_relay->m_fee_filter_received = newFeeFilter; } LogDebug(BCLog::NET, "received: feefilter of %s from peer=%d\n", CFeeRate(newFeeFilter).ToString(), pfrom.GetId()); @@ -5107,17 +5106,17 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } if (msg_type == NetMsgType::GETCFILTERS) { - ProcessGetCFilters(pfrom, *peer, vRecv); + ProcessGetCFilters(pfrom, peer, vRecv); return; } if (msg_type == NetMsgType::GETCFHEADERS) { - ProcessGetCFHeaders(pfrom, *peer, vRecv); + ProcessGetCFHeaders(pfrom, peer, vRecv); return; } if (msg_type == NetMsgType::GETCFCHECKPT) { - ProcessGetCFCheckPt(pfrom, *peer, vRecv); + ProcessGetCFCheckPt(pfrom, peer, vRecv); return; } @@ -5181,28 +5180,29 @@ bool PeerManagerImpl::MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer) return true; } -bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic& interruptMsgProc) +bool PeerManagerImpl::ProcessMessages(CNode& node, std::atomic& interruptMsgProc) { AssertLockNotHeld(m_tx_download_mutex); AssertLockHeld(g_msgproc_mutex); - PeerRef peer = GetPeerRef(pfrom->GetId()); - if (peer == nullptr) return false; + PeerRef maybe_peer{GetPeerRef(node.GetId())}; + if (maybe_peer == nullptr) return false; + Peer& peer{*maybe_peer}; // For outbound connections, ensure that the initial VERSION message // has been sent first before processing any incoming messages - if (!pfrom->IsInboundConn() && !peer->m_outbound_version_message_sent) return false; + if (!node.IsInboundConn() && !peer.m_outbound_version_message_sent) return false; { - LOCK(peer->m_getdata_requests_mutex); - if (!peer->m_getdata_requests.empty()) { - ProcessGetData(*pfrom, *peer, interruptMsgProc); + LOCK(peer.m_getdata_requests_mutex); + if (!peer.m_getdata_requests.empty()) { + ProcessGetData(node, peer, interruptMsgProc); } } - const bool processed_orphan = ProcessOrphanTx(*peer); + const bool processed_orphan = ProcessOrphanTx(peer); - if (pfrom->fDisconnect) + if (node.fDisconnect) return false; if (processed_orphan) return true; @@ -5210,14 +5210,14 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic& interrupt // this maintains the order of responses // and prevents m_getdata_requests to grow unbounded { - LOCK(peer->m_getdata_requests_mutex); - if (!peer->m_getdata_requests.empty()) return true; + LOCK(peer.m_getdata_requests_mutex); + if (!peer.m_getdata_requests.empty()) return true; } // Don't bother if send buffer is too full to respond anyway - if (pfrom->fPauseSend) return false; + if (node.fPauseSend) return false; - auto poll_result{pfrom->PollMessage()}; + auto poll_result{node.PollMessage()}; if (!poll_result) { // No message to process return false; @@ -5227,32 +5227,32 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic& interrupt bool fMoreWork = poll_result->second; TRACEPOINT(net, inbound_message, - pfrom->GetId(), - pfrom->m_addr_name.c_str(), - pfrom->ConnectionTypeAsString().c_str(), + node.GetId(), + node.m_addr_name.c_str(), + node.ConnectionTypeAsString().c_str(), msg.m_type.c_str(), msg.m_recv.size(), msg.m_recv.data() ); if (m_opts.capture_messages) { - CaptureMessage(pfrom->addr, msg.m_type, MakeUCharSpan(msg.m_recv), /*is_incoming=*/true); + CaptureMessage(node.addr, msg.m_type, MakeUCharSpan(msg.m_recv), /*is_incoming=*/true); } try { - ProcessMessage(*pfrom, msg.m_type, msg.m_recv, msg.m_time, interruptMsgProc); + ProcessMessage(peer, node, msg.m_type, msg.m_recv, msg.m_time, interruptMsgProc); if (interruptMsgProc) return false; { - LOCK(peer->m_getdata_requests_mutex); - if (!peer->m_getdata_requests.empty()) fMoreWork = true; + LOCK(peer.m_getdata_requests_mutex); + if (!peer.m_getdata_requests.empty()) fMoreWork = true; } - // Does this peer has an orphan ready to reconsider? + // Does this peer have an orphan ready to reconsider? // (Note: we may have provided a parent for an orphan provided // by another peer that was already processed; in that case, // the extra work may not be noticed, possibly resulting in an // unnecessary 100ms delay) LOCK(m_tx_download_mutex); - if (m_txdownloadman.HaveMoreWork(peer->m_id)) fMoreWork = true; + if (m_txdownloadman.HaveMoreWork(peer.m_id)) fMoreWork = true; } catch (const std::exception& e) { LogDebug(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg.m_type), msg.m_message_size, e.what(), typeid(e).name()); } catch (...) { @@ -5683,27 +5683,28 @@ bool PeerManagerImpl::SetupAddressRelay(const CNode& node, Peer& peer) return true; } -bool PeerManagerImpl::SendMessages(CNode* pto) +bool PeerManagerImpl::SendMessages(CNode& node) { AssertLockNotHeld(m_tx_download_mutex); AssertLockHeld(g_msgproc_mutex); - PeerRef peer = GetPeerRef(pto->GetId()); - if (!peer) return false; + PeerRef maybe_peer{GetPeerRef(node.GetId())}; + if (!maybe_peer) return false; + Peer& peer{*maybe_peer}; const Consensus::Params& consensusParams = m_chainparams.GetConsensus(); // We must call MaybeDiscourageAndDisconnect first, to ensure that we'll // disconnect misbehaving peers even before the version handshake is complete. - if (MaybeDiscourageAndDisconnect(*pto, *peer)) return true; + if (MaybeDiscourageAndDisconnect(node, peer)) return true; // Initiate version handshake for outbound connections - if (!pto->IsInboundConn() && !peer->m_outbound_version_message_sent) { - PushNodeVersion(*pto, *peer); - peer->m_outbound_version_message_sent = true; + if (!node.IsInboundConn() && !peer.m_outbound_version_message_sent) { + PushNodeVersion(node, peer); + peer.m_outbound_version_message_sent = true; } // Don't send anything until the version handshake is complete - if (!pto->fSuccessfullyConnected || pto->fDisconnect) + if (!node.fSuccessfullyConnected || node.fDisconnect) return true; const auto current_time{GetTime()}; @@ -5711,34 +5712,34 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // The logic below does not apply to private broadcast peers, so skip it. // Also in CConnman::PushMessage() we make sure that unwanted messages are // not sent. This here is just an optimization. - if (pto->IsPrivateBroadcastConn()) { - if (pto->m_connected + PRIVATE_BROADCAST_MAX_CONNECTION_LIFETIME < current_time) { + if (node.IsPrivateBroadcastConn()) { + if (node.m_connected + PRIVATE_BROADCAST_MAX_CONNECTION_LIFETIME < current_time) { LogDebug(BCLog::PRIVBROADCAST, "Disconnecting: did not complete the transaction send within %d seconds, peer=%d%s", - count_seconds(PRIVATE_BROADCAST_MAX_CONNECTION_LIFETIME), pto->GetId(), pto->LogIP(fLogIPs)); - pto->fDisconnect = true; + count_seconds(PRIVATE_BROADCAST_MAX_CONNECTION_LIFETIME), node.GetId(), node.LogIP(fLogIPs)); + node.fDisconnect = true; } return true; } - if (pto->IsAddrFetchConn() && current_time - pto->m_connected > 10 * AVG_ADDRESS_BROADCAST_INTERVAL) { - LogDebug(BCLog::NET, "addrfetch connection timeout, %s\n", pto->DisconnectMsg(fLogIPs)); - pto->fDisconnect = true; + if (node.IsAddrFetchConn() && current_time - node.m_connected > 10 * AVG_ADDRESS_BROADCAST_INTERVAL) { + LogDebug(BCLog::NET, "addrfetch connection timeout, %s\n", node.DisconnectMsg(fLogIPs)); + node.fDisconnect = true; return true; } - MaybeSendPing(*pto, *peer, current_time); + MaybeSendPing(node, peer, current_time); // MaybeSendPing may have marked peer for disconnection - if (pto->fDisconnect) return true; + if (node.fDisconnect) return true; - MaybeSendAddr(*pto, *peer, current_time); + MaybeSendAddr(node, peer, current_time); - MaybeSendSendHeaders(*pto, *peer); + MaybeSendSendHeaders(node, peer); { LOCK(cs_main); - CNodeState &state = *State(pto->GetId()); + CNodeState &state = *State(node.GetId()); // Start block sync if (m_chainman.m_best_header == nullptr) { @@ -5751,7 +5752,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) bool sync_blocks_and_headers_from_peer = false; if (state.fPreferredDownload) { sync_blocks_and_headers_from_peer = true; - } else if (CanServeBlocks(*peer) && !pto->IsAddrFetchConn()) { + } else if (CanServeBlocks(peer) && !node.IsAddrFetchConn()) { // Typically this is an inbound peer. If we don't have any outbound // peers, or if we aren't downloading any blocks from such peers, // then allow block downloads from this peer, too. @@ -5766,7 +5767,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) } } - if (!state.fSyncStarted && CanServeBlocks(*peer) && !m_chainman.m_blockman.LoadingBlocks()) { + if (!state.fSyncStarted && CanServeBlocks(peer) && !m_chainman.m_blockman.LoadingBlocks()) { // Only actively request headers from a single peer, unless we're close to today. if ((nSyncStarted == 0 && sync_blocks_and_headers_from_peer) || m_chainman.m_best_header->Time() > NodeClock::now() - 24h) { const CBlockIndex* pindexStart = m_chainman.m_best_header; @@ -5779,11 +5780,11 @@ bool PeerManagerImpl::SendMessages(CNode* pto) got back an empty response. */ if (pindexStart->pprev) pindexStart = pindexStart->pprev; - if (MaybeSendGetHeaders(*pto, GetLocator(pindexStart), *peer)) { - LogDebug(BCLog::NET, "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, pto->GetId(), peer->m_starting_height); + if (MaybeSendGetHeaders(node, GetLocator(pindexStart), peer)) { + LogDebug(BCLog::NET, "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, node.GetId(), peer.m_starting_height); state.fSyncStarted = true; - peer->m_headers_sync_timeout = current_time + HEADERS_DOWNLOAD_TIMEOUT_BASE + + peer.m_headers_sync_timeout = current_time + HEADERS_DOWNLOAD_TIMEOUT_BASE + ( // Convert HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER to microseconds before scaling // to maintain precision @@ -5806,20 +5807,20 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // If no header would connect, or if we have too many // blocks, or if the peer doesn't want headers, just // add all to the inv queue. - LOCK(peer->m_block_inv_mutex); + LOCK(peer.m_block_inv_mutex); std::vector vHeaders; - bool fRevertToInv = ((!peer->m_prefers_headers && - (!state.m_requested_hb_cmpctblocks || peer->m_blocks_for_headers_relay.size() > 1)) || - peer->m_blocks_for_headers_relay.size() > MAX_BLOCKS_TO_ANNOUNCE); + bool fRevertToInv = ((!peer.m_prefers_headers && + (!state.m_requested_hb_cmpctblocks || peer.m_blocks_for_headers_relay.size() > 1)) || + peer.m_blocks_for_headers_relay.size() > MAX_BLOCKS_TO_ANNOUNCE); const CBlockIndex *pBestIndex = nullptr; // last header queued for delivery - ProcessBlockAvailability(pto->GetId()); // ensure pindexBestKnownBlock is up-to-date + ProcessBlockAvailability(node.GetId()); // ensure pindexBestKnownBlock is up-to-date if (!fRevertToInv) { bool fFoundStartingHeader = false; // Try to find first header that our peer doesn't have, and // then send all headers past that one. If we come across any // headers that aren't on m_chainman.ActiveChain(), give up. - for (const uint256& hash : peer->m_blocks_for_headers_relay) { + for (const uint256& hash : peer.m_blocks_for_headers_relay) { const CBlockIndex* pindex = m_chainman.m_blockman.LookupBlockIndex(hash); assert(pindex); if (m_chainman.ActiveChain()[pindex->nHeight] != pindex) { @@ -5866,7 +5867,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // We only send up to 1 block as header-and-ids, as otherwise // probably means we're doing an initial-ish-sync or they're slow LogDebug(BCLog::NET, "%s sending header-and-ids %s to peer=%d\n", __func__, - vHeaders.front().GetHash().ToString(), pto->GetId()); + vHeaders.front().GetHash().ToString(), node.GetId()); std::optional cached_cmpctblock_msg; { @@ -5876,26 +5877,26 @@ bool PeerManagerImpl::SendMessages(CNode* pto) } } if (cached_cmpctblock_msg.has_value()) { - PushMessage(*pto, std::move(cached_cmpctblock_msg.value())); + PushMessage(node, std::move(cached_cmpctblock_msg.value())); } else { CBlock block; const bool ret{m_chainman.m_blockman.ReadBlock(block, *pBestIndex)}; assert(ret); CBlockHeaderAndShortTxIDs cmpctblock{block, m_rng.rand64()}; - MakeAndPushMessage(*pto, NetMsgType::CMPCTBLOCK, cmpctblock); + MakeAndPushMessage(node, NetMsgType::CMPCTBLOCK, cmpctblock); } state.pindexBestHeaderSent = pBestIndex; - } else if (peer->m_prefers_headers) { + } else if (peer.m_prefers_headers) { if (vHeaders.size() > 1) { LogDebug(BCLog::NET, "%s: %u headers, range (%s, %s), to peer=%d\n", __func__, vHeaders.size(), vHeaders.front().GetHash().ToString(), - vHeaders.back().GetHash().ToString(), pto->GetId()); + vHeaders.back().GetHash().ToString(), node.GetId()); } else { LogDebug(BCLog::NET, "%s: sending header %s to peer=%d\n", __func__, - vHeaders.front().GetHash().ToString(), pto->GetId()); + vHeaders.front().GetHash().ToString(), node.GetId()); } - MakeAndPushMessage(*pto, NetMsgType::HEADERS, TX_WITH_WITNESS(vHeaders)); + MakeAndPushMessage(node, NetMsgType::HEADERS, TX_WITH_WITNESS(vHeaders)); state.pindexBestHeaderSent = pBestIndex; } else fRevertToInv = true; @@ -5904,8 +5905,8 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // If falling back to using an inv, just try to inv the tip. // The last entry in m_blocks_for_headers_relay was our tip at some point // in the past. - if (!peer->m_blocks_for_headers_relay.empty()) { - const uint256& hashToAnnounce = peer->m_blocks_for_headers_relay.back(); + if (!peer.m_blocks_for_headers_relay.empty()) { + const uint256& hashToAnnounce = peer.m_blocks_for_headers_relay.back(); const CBlockIndex* pindex = m_chainman.m_blockman.LookupBlockIndex(hashToAnnounce); assert(pindex); @@ -5919,13 +5920,13 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // If the peer's chain has this block, don't inv it back. if (!PeerHasHeader(&state, pindex)) { - peer->m_blocks_for_inv_relay.push_back(hashToAnnounce); + peer.m_blocks_for_inv_relay.push_back(hashToAnnounce); LogDebug(BCLog::NET, "%s: sending inv peer=%d hash=%s\n", __func__, - pto->GetId(), hashToAnnounce.ToString()); + node.GetId(), hashToAnnounce.ToString()); } } } - peer->m_blocks_for_headers_relay.clear(); + peer.m_blocks_for_headers_relay.clear(); } // @@ -5933,28 +5934,28 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // std::vector vInv; { - LOCK(peer->m_block_inv_mutex); - vInv.reserve(std::max(peer->m_blocks_for_inv_relay.size(), INVENTORY_BROADCAST_TARGET)); + LOCK(peer.m_block_inv_mutex); + vInv.reserve(std::max(peer.m_blocks_for_inv_relay.size(), INVENTORY_BROADCAST_TARGET)); // Add blocks - for (const uint256& hash : peer->m_blocks_for_inv_relay) { + for (const uint256& hash : peer.m_blocks_for_inv_relay) { vInv.emplace_back(MSG_BLOCK, hash); if (vInv.size() == MAX_INV_SZ) { - MakeAndPushMessage(*pto, NetMsgType::INV, vInv); + MakeAndPushMessage(node, NetMsgType::INV, vInv); vInv.clear(); } } - peer->m_blocks_for_inv_relay.clear(); + peer.m_blocks_for_inv_relay.clear(); } - if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) { + if (auto tx_relay = peer.GetTxRelay(); tx_relay != nullptr) { LOCK(tx_relay->m_tx_inventory_mutex); // Check whether periodic sends should happen - bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan); + bool fSendTrickle = node.HasPermission(NetPermissionFlags::NoBan); if (tx_relay->m_next_inv_send_time < current_time) { fSendTrickle = true; - if (pto->IsInboundConn()) { - tx_relay->m_next_inv_send_time = NextInvToInbounds(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL, pto->m_network_key); + if (node.IsInboundConn()) { + tx_relay->m_next_inv_send_time = NextInvToInbounds(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL, node.m_network_key); } else { tx_relay->m_next_inv_send_time = current_time + m_rng.rand_exp_duration(OUTBOUND_INVENTORY_BROADCAST_INTERVAL); } @@ -5977,7 +5978,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) for (const auto& txinfo : vtxinfo) { const Txid& txid{txinfo.tx->GetHash()}; const Wtxid& wtxid{txinfo.tx->GetWitnessHash()}; - const auto inv = peer->m_wtxid_relay ? + const auto inv = peer.m_wtxid_relay ? CInv{MSG_WTX, wtxid.ToUint256()} : CInv{MSG_TX, txid.ToUint256()}; tx_relay->m_tx_inventory_to_send.erase(wtxid); @@ -5992,7 +5993,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) tx_relay->m_tx_inventory_known_filter.insert(inv.hash); vInv.push_back(inv); if (vInv.size() == MAX_INV_SZ) { - MakeAndPushMessage(*pto, NetMsgType::INV, vInv); + MakeAndPushMessage(node, NetMsgType::INV, vInv); vInv.clear(); } } @@ -6033,7 +6034,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // `TxRelay::m_tx_inventory_known_filter` contains either txids or wtxids // depending on whether our peer supports wtxid-relay. Therefore, first // construct the inv and then use its hash for the filter check. - const auto inv = peer->m_wtxid_relay ? + const auto inv = peer.m_wtxid_relay ? CInv{MSG_WTX, wtxid.ToUint256()} : CInv{MSG_TX, txinfo.tx->GetHash().ToUint256()}; // Check if not in the filter already @@ -6049,7 +6050,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) vInv.push_back(inv); nRelayedTransactions++; if (vInv.size() == MAX_INV_SZ) { - MakeAndPushMessage(*pto, NetMsgType::INV, vInv); + MakeAndPushMessage(node, NetMsgType::INV, vInv); vInv.clear(); } tx_relay->m_tx_inventory_known_filter.insert(inv.hash); @@ -6061,7 +6062,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) } } if (!vInv.empty()) - MakeAndPushMessage(*pto, NetMsgType::INV, vInv); + MakeAndPushMessage(node, NetMsgType::INV, vInv); // Detect whether we're stalling auto stalling_timeout = m_block_stalling_timeout.load(); @@ -6069,8 +6070,8 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // Stalling only triggers when the block download window cannot move. During normal steady state, // the download window should be much larger than the to-be-downloaded set of blocks, so disconnection // should only happen during initial block download. - LogInfo("Peer is stalling block download, %s\n", pto->DisconnectMsg(fLogIPs)); - pto->fDisconnect = true; + LogInfo("Peer is stalling block download, %s\n", node.DisconnectMsg(fLogIPs)); + node.fDisconnect = true; // Increase timeout for the next peer so that we don't disconnect multiple peers if our own // bandwidth is insufficient. const auto new_timeout = std::min(2 * stalling_timeout, BLOCK_STALLING_TIMEOUT_MAX); @@ -6088,27 +6089,27 @@ bool PeerManagerImpl::SendMessages(CNode* pto) QueuedBlock &queuedBlock = state.vBlocksInFlight.front(); int nOtherPeersWithValidatedDownloads = m_peers_downloading_from - 1; if (current_time > state.m_downloading_since + std::chrono::seconds{consensusParams.nPowTargetSpacing} * (BLOCK_DOWNLOAD_TIMEOUT_BASE + BLOCK_DOWNLOAD_TIMEOUT_PER_PEER * nOtherPeersWithValidatedDownloads)) { - LogInfo("Timeout downloading block %s, %s\n", queuedBlock.pindex->GetBlockHash().ToString(), pto->DisconnectMsg(fLogIPs)); - pto->fDisconnect = true; + LogInfo("Timeout downloading block %s, %s\n", queuedBlock.pindex->GetBlockHash().ToString(), node.DisconnectMsg(fLogIPs)); + node.fDisconnect = true; return true; } } // Check for headers sync timeouts - if (state.fSyncStarted && peer->m_headers_sync_timeout < std::chrono::microseconds::max()) { + if (state.fSyncStarted && peer.m_headers_sync_timeout < std::chrono::microseconds::max()) { // Detect whether this is a stalling initial-headers-sync peer if (m_chainman.m_best_header->Time() <= NodeClock::now() - 24h) { - if (current_time > peer->m_headers_sync_timeout && nSyncStarted == 1 && (m_num_preferred_download_peers - state.fPreferredDownload >= 1)) { + if (current_time > peer.m_headers_sync_timeout && nSyncStarted == 1 && (m_num_preferred_download_peers - state.fPreferredDownload >= 1)) { // Disconnect a peer (without NetPermissionFlags::NoBan permission) if it is our only sync peer, // and we have others we could be using instead. // Note: If all our peers are inbound, then we won't // disconnect our sync peer for stalling; we have bigger // problems if we can't get any outbound peers. - if (!pto->HasPermission(NetPermissionFlags::NoBan)) { - LogInfo("Timeout downloading headers, %s\n", pto->DisconnectMsg(fLogIPs)); - pto->fDisconnect = true; + if (!node.HasPermission(NetPermissionFlags::NoBan)) { + LogInfo("Timeout downloading headers, %s\n", node.DisconnectMsg(fLogIPs)); + node.fDisconnect = true; return true; } else { - LogInfo("Timeout downloading headers from noban peer, not %s\n", pto->DisconnectMsg(fLogIPs)); + LogInfo("Timeout downloading headers from noban peer, not %s\n", node.DisconnectMsg(fLogIPs)); // Reset the headers sync state so that we have a // chance to try downloading from a different peer. // Note: this will also result in at least one more @@ -6116,25 +6117,25 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // this peer (eventually). state.fSyncStarted = false; nSyncStarted--; - peer->m_headers_sync_timeout = 0us; + peer.m_headers_sync_timeout = 0us; } } } else { // After we've caught up once, reset the timeout so we can't trigger // disconnect later. - peer->m_headers_sync_timeout = std::chrono::microseconds::max(); + peer.m_headers_sync_timeout = std::chrono::microseconds::max(); } } // Check that outbound peers have reasonable chains // GetTime() is used by this anti-DoS logic so we can test this using mocktime - ConsiderEviction(*pto, *peer, GetTime()); + ConsiderEviction(node, peer, GetTime()); // // Message: getdata (blocks) // std::vector vGetData; - if (CanServeBlocks(*peer) && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(*peer)) || !m_chainman.IsInitialBlockDownload()) && state.vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + if (CanServeBlocks(peer) && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(peer)) || !m_chainman.IsInitialBlockDownload()) && state.vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { std::vector vToDownload; NodeId staller = -1; auto get_inflight_budget = [&state]() { @@ -6144,23 +6145,23 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // If there are multiple chainstates, download blocks for the // current chainstate first, to prioritize getting to network tip // before downloading historical blocks. - FindNextBlocksToDownload(*peer, get_inflight_budget(), vToDownload, staller); + FindNextBlocksToDownload(peer, get_inflight_budget(), vToDownload, staller); auto historical_blocks{m_chainman.GetHistoricalBlockRange()}; - if (historical_blocks && !IsLimitedPeer(*peer)) { + if (historical_blocks && !IsLimitedPeer(peer)) { // If the first needed historical block is not an ancestor of the last, // we need to start requesting blocks from their last common ancestor. const CBlockIndex* from_tip = LastCommonAncestor(historical_blocks->first, historical_blocks->second); TryDownloadingHistoricalBlocks( - *peer, + peer, get_inflight_budget(), vToDownload, from_tip, historical_blocks->second); } for (const CBlockIndex *pindex : vToDownload) { - uint32_t nFetchFlags = GetFetchFlags(*peer); + uint32_t nFetchFlags = GetFetchFlags(peer); vGetData.emplace_back(MSG_BLOCK | nFetchFlags, pindex->GetBlockHash()); - BlockRequested(pto->GetId(), *pindex); + BlockRequested(node.GetId(), *pindex); LogDebug(BCLog::NET, "Requesting block %s (%d) peer=%d\n", pindex->GetBlockHash().ToString(), - pindex->nHeight, pto->GetId()); + pindex->nHeight, node.GetId()); } if (state.vBlocksInFlight.empty() && staller != -1) { if (State(staller)->m_stalling_since == 0us) { @@ -6175,18 +6176,18 @@ bool PeerManagerImpl::SendMessages(CNode* pto) // { LOCK(m_tx_download_mutex); - for (const GenTxid& gtxid : m_txdownloadman.GetRequestsToSend(pto->GetId(), current_time)) { - vGetData.emplace_back(gtxid.IsWtxid() ? MSG_WTX : (MSG_TX | GetFetchFlags(*peer)), gtxid.ToUint256()); + for (const GenTxid& gtxid : m_txdownloadman.GetRequestsToSend(node.GetId(), current_time)) { + vGetData.emplace_back(gtxid.IsWtxid() ? MSG_WTX : (MSG_TX | GetFetchFlags(peer)), gtxid.ToUint256()); if (vGetData.size() >= MAX_GETDATA_SZ) { - MakeAndPushMessage(*pto, NetMsgType::GETDATA, vGetData); + MakeAndPushMessage(node, NetMsgType::GETDATA, vGetData); vGetData.clear(); } } } if (!vGetData.empty()) - MakeAndPushMessage(*pto, NetMsgType::GETDATA, vGetData); + MakeAndPushMessage(node, NetMsgType::GETDATA, vGetData); } // release cs_main - MaybeSendFeefilter(*pto, *peer, current_time); + MaybeSendFeefilter(node, peer, current_time); return true; } diff --git a/src/net_processing.h b/src/net_processing.h index 4b221c5d979..504e708d702 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -147,10 +147,6 @@ public: */ virtual void CheckForStaleTipAndEvictPeers() = 0; - /** Process a single message from a peer. Public for fuzz testing */ - virtual void ProcessMessage(CNode& pfrom, const std::string& msg_type, DataStream& vRecv, - std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0; - /** This function is used for testing the stale tip eviction logic, see denialofservice_tests.cpp */ virtual void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) = 0; diff --git a/src/test/denialofservice_tests.cpp b/src/test/denialofservice_tests.cpp index 0dc0323028e..776a18788e3 100644 --- a/src/test/denialofservice_tests.cpp +++ b/src/test/denialofservice_tests.cpp @@ -81,7 +81,7 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) } // Test starts here - BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders + BOOST_CHECK(peerman.SendMessages(dummyNode1)); // should result in getheaders { LOCK(dummyNode1.cs_vSend); @@ -93,7 +93,7 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) int64_t nStartTime = GetTime(); // Wait 21 minutes SetMockTime(nStartTime+21*60); - BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders + BOOST_CHECK(peerman.SendMessages(dummyNode1)); // should result in getheaders { LOCK(dummyNode1.cs_vSend); const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend(false); @@ -101,7 +101,7 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction) } // Wait 3 more minutes SetMockTime(nStartTime+24*60); - BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in disconnect + BOOST_CHECK(peerman.SendMessages(dummyNode1)); // should result in disconnect BOOST_CHECK(dummyNode1.fDisconnect == true); peerman.FinalizeNode(dummyNode1); @@ -336,7 +336,7 @@ BOOST_AUTO_TEST_CASE(peer_discouragement) nodes[0]->fSuccessfullyConnected = true; connman->AddTestNode(*nodes[0]); peerLogic->UnitTestMisbehaving(nodes[0]->GetId()); // Should be discouraged - BOOST_CHECK(peerLogic->SendMessages(nodes[0])); + BOOST_CHECK(peerLogic->SendMessages(*nodes[0])); BOOST_CHECK(banman->IsDiscouraged(addr[0])); BOOST_CHECK(nodes[0]->fDisconnect); @@ -356,7 +356,7 @@ BOOST_AUTO_TEST_CASE(peer_discouragement) peerLogic->InitializeNode(*nodes[1], NODE_NETWORK); nodes[1]->fSuccessfullyConnected = true; connman->AddTestNode(*nodes[1]); - BOOST_CHECK(peerLogic->SendMessages(nodes[1])); + BOOST_CHECK(peerLogic->SendMessages(*nodes[1])); // [0] is still discouraged/disconnected. BOOST_CHECK(banman->IsDiscouraged(addr[0])); BOOST_CHECK(nodes[0]->fDisconnect); @@ -364,7 +364,7 @@ BOOST_AUTO_TEST_CASE(peer_discouragement) BOOST_CHECK(!banman->IsDiscouraged(addr[1])); BOOST_CHECK(!nodes[1]->fDisconnect); peerLogic->UnitTestMisbehaving(nodes[1]->GetId()); - BOOST_CHECK(peerLogic->SendMessages(nodes[1])); + BOOST_CHECK(peerLogic->SendMessages(*nodes[1])); // Expect both [0] and [1] to be discouraged/disconnected now. BOOST_CHECK(banman->IsDiscouraged(addr[0])); BOOST_CHECK(nodes[0]->fDisconnect); @@ -388,7 +388,7 @@ BOOST_AUTO_TEST_CASE(peer_discouragement) nodes[2]->fSuccessfullyConnected = true; connman->AddTestNode(*nodes[2]); peerLogic->UnitTestMisbehaving(nodes[2]->GetId()); - BOOST_CHECK(peerLogic->SendMessages(nodes[2])); + BOOST_CHECK(peerLogic->SendMessages(*nodes[2])); BOOST_CHECK(banman->IsDiscouraged(addr[0])); BOOST_CHECK(banman->IsDiscouraged(addr[1])); BOOST_CHECK(banman->IsDiscouraged(addr[2])); @@ -431,7 +431,7 @@ BOOST_AUTO_TEST_CASE(DoS_bantime) dummyNode.fSuccessfullyConnected = true; peerLogic->UnitTestMisbehaving(dummyNode.GetId()); - BOOST_CHECK(peerLogic->SendMessages(&dummyNode)); + BOOST_CHECK(peerLogic->SendMessages(dummyNode)); BOOST_CHECK(banman->IsDiscouraged(addr)); peerLogic->FinalizeNode(dummyNode); diff --git a/src/test/fuzz/p2p_handshake.cpp b/src/test/fuzz/p2p_handshake.cpp index a56e1e5f1fb..7ca0a06a8b0 100644 --- a/src/test/fuzz/p2p_handshake.cpp +++ b/src/test/fuzz/p2p_handshake.cpp @@ -100,7 +100,7 @@ FUZZ_TARGET(p2p_handshake, .init = ::initialize) more_work = connman.ProcessMessagesOnce(connection); } catch (const std::ios_base::failure&) { } - peerman->SendMessages(&connection); + peerman->SendMessages(connection); } } diff --git a/src/test/fuzz/p2p_headers_presync.cpp b/src/test/fuzz/p2p_headers_presync.cpp index e9de1850f5a..25a83fef4ea 100644 --- a/src/test/fuzz/p2p_headers_presync.cpp +++ b/src/test/fuzz/p2p_headers_presync.cpp @@ -97,7 +97,7 @@ void HeadersSyncSetup::SendMessage(FuzzedDataProvider& fuzzed_data_provider, CSe connman.ProcessMessagesOnce(connection); } catch (const std::ios_base::failure&) { } - m_node.peerman->SendMessages(&connection); + m_node.peerman->SendMessages(connection); } CBlockHeader ConsumeHeader(FuzzedDataProvider& fuzzed_data_provider, const uint256& prev_hash, uint32_t prev_nbits) diff --git a/src/test/fuzz/process_message.cpp b/src/test/fuzz/process_message.cpp index 7a24c1de348..5cff747c533 100644 --- a/src/test/fuzz/process_message.cpp +++ b/src/test/fuzz/process_message.cpp @@ -123,7 +123,7 @@ FUZZ_TARGET(process_message, .init = initialize_process_message) more_work = connman.ProcessMessagesOnce(p2p_node); } catch (const std::ios_base::failure&) { } - node.peerman->SendMessages(&p2p_node); + node.peerman->SendMessages(p2p_node); } node.validation_signals->SyncWithValidationInterfaceQueue(); node.connman->StopNodes(); diff --git a/src/test/fuzz/process_messages.cpp b/src/test/fuzz/process_messages.cpp index 28bee67d37e..852293e2304 100644 --- a/src/test/fuzz/process_messages.cpp +++ b/src/test/fuzz/process_messages.cpp @@ -122,7 +122,7 @@ FUZZ_TARGET(process_messages, .init = initialize_process_messages) more_work = connman.ProcessMessagesOnce(random_node); } catch (const std::ios_base::failure&) { } - node.peerman->SendMessages(&random_node); + node.peerman->SendMessages(random_node); } } node.validation_signals->SyncWithValidationInterfaceQueue(); diff --git a/src/test/fuzz/util/net.h b/src/test/fuzz/util/net.h index 2c169392df8..862153ab26f 100644 --- a/src/test/fuzz/util/net.h +++ b/src/test/fuzz/util/net.h @@ -151,9 +151,9 @@ public: virtual bool HasAllDesirableServiceFlags(ServiceFlags) const override { return m_fdp.ConsumeBool(); } - virtual bool ProcessMessages(CNode*, std::atomic&) override { return m_fdp.ConsumeBool(); } + virtual bool ProcessMessages(CNode&, std::atomic&) override { return m_fdp.ConsumeBool(); } - virtual bool SendMessages(CNode*) override { return m_fdp.ConsumeBool(); } + virtual bool SendMessages(CNode&) override { return m_fdp.ConsumeBool(); } private: FuzzedDataProvider& m_fdp; diff --git a/src/test/net_tests.cpp b/src/test/net_tests.cpp index aea29169bb3..72a06a68df8 100644 --- a/src/test/net_tests.cpp +++ b/src/test/net_tests.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include @@ -16,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -26,6 +26,7 @@ #include #include +#include #include #include #include @@ -802,6 +803,7 @@ BOOST_AUTO_TEST_CASE(LocalAddress_BasicLifecycle) BOOST_AUTO_TEST_CASE(initial_advertise_from_version_message) { LOCK(NetEventsInterface::g_msgproc_mutex); + auto& connman{static_cast(*m_node.connman)}; // Tests the following scenario: // * -bind=3.4.5.6:20001 is specified @@ -845,22 +847,24 @@ BOOST_AUTO_TEST_CASE(initial_advertise_from_version_message) m_node.peerman->InitializeNode(peer, NODE_NETWORK); - std::atomic interrupt_dummy{false}; - std::chrono::microseconds time_received_dummy{0}; + m_node.peerman->SendMessages(peer); + connman.FlushSendBuffer(peer); // Drop sent version message - const auto msg_version = + auto msg_version_receive = NetMsg::Make(NetMsgType::VERSION, PROTOCOL_VERSION, services, time, services, CAddress::V1_NETWORK(peer_us)); - DataStream msg_version_stream{msg_version.data}; + Assert(connman.ReceiveMsgFrom(peer, std::move(msg_version_receive))); + peer.fPauseSend = false; + bool more_work{connman.ProcessMessagesOnce(peer)}; + Assert(!more_work); - m_node.peerman->ProcessMessage( - peer, NetMsgType::VERSION, msg_version_stream, time_received_dummy, interrupt_dummy); - - const auto msg_verack = NetMsg::Make(NetMsgType::VERACK); - DataStream msg_verack_stream{msg_verack.data}; + m_node.peerman->SendMessages(peer); + connman.FlushSendBuffer(peer); // Drop sent verack message + Assert(connman.ReceiveMsgFrom(peer, NetMsg::Make(NetMsgType::VERACK))); + peer.fPauseSend = false; // Will set peer.fSuccessfullyConnected to true (necessary in SendMessages()). - m_node.peerman->ProcessMessage( - peer, NetMsgType::VERACK, msg_verack_stream, time_received_dummy, interrupt_dummy); + more_work = connman.ProcessMessagesOnce(peer); + Assert(!more_work); // Ensure that peer_us_addr:bind_port is sent to the peer. const CService expected{peer_us_addr, bind_port}; @@ -886,7 +890,7 @@ BOOST_AUTO_TEST_CASE(initial_advertise_from_version_message) } }; - m_node.peerman->SendMessages(&peer); + m_node.peerman->SendMessages(peer); BOOST_CHECK(sent); diff --git a/src/test/util/net.cpp b/src/test/util/net.cpp index 0f59e0d6c84..0979e2ae45d 100644 --- a/src/test/util/net.cpp +++ b/src/test/util/net.cpp @@ -31,7 +31,7 @@ void ConnmanTestMsg::Handshake(CNode& node, auto& connman{*this}; peerman.InitializeNode(node, local_services); - peerman.SendMessages(&node); + peerman.SendMessages(node); FlushSendBuffer(node); // Drop the version message added by SendMessages. CSerializedNetMsg msg_version{ @@ -52,7 +52,7 @@ void ConnmanTestMsg::Handshake(CNode& node, (void)connman.ReceiveMsgFrom(node, std::move(msg_version)); node.fPauseSend = false; connman.ProcessMessagesOnce(node); - peerman.SendMessages(&node); + peerman.SendMessages(node); FlushSendBuffer(node); // Drop the verack message added by SendMessages. if (node.fDisconnect) return; assert(node.nVersion == version); @@ -66,7 +66,7 @@ void ConnmanTestMsg::Handshake(CNode& node, (void)connman.ReceiveMsgFrom(node, std::move(msg_verack)); node.fPauseSend = false; connman.ProcessMessagesOnce(node); - peerman.SendMessages(&node); + peerman.SendMessages(node); assert(node.fSuccessfullyConnected == true); } } diff --git a/src/test/util/net.h b/src/test/util/net.h index ee02d404ec0..732aefb3742 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -101,7 +101,7 @@ struct ConnmanTestMsg : public CConnman { bool ProcessMessagesOnce(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(NetEventsInterface::g_msgproc_mutex) { - return m_msgproc->ProcessMessages(&node, flagInterruptMsgProc); + return m_msgproc->ProcessMessages(node, flagInterruptMsgProc); } void NodeReceiveMsgBytes(CNode& node, std::span msg_bytes, bool& complete) const;