net_processing: retry private broadcast

Periodically check for stale transactions in peerman and if found,
reschedule new connections to be opened by connman for broadcasting
them.
This commit is contained in:
Vasil Dimov
2024-01-30 18:25:23 +01:00
parent 37b79f9c39
commit eab595f9cf
5 changed files with 67 additions and 0 deletions

View File

@@ -566,6 +566,9 @@ private:
/** Retrieve unbroadcast transactions from the mempool and reattempt sending to peers */
void ReattemptInitialBroadcast(CScheduler& scheduler) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
/** Rebroadcast stale private transactions (already broadcast but not received back from the network). */
void ReattemptPrivateBroadcast(CScheduler& scheduler);
/** Get a shared pointer to the Peer object.
* May return an empty shared_ptr if the Peer object can't be found. */
PeerRef GetPeerRef(NodeId id) const EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
@@ -1633,6 +1636,37 @@ void PeerManagerImpl::ReattemptInitialBroadcast(CScheduler& scheduler)
scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta);
}
void PeerManagerImpl::ReattemptPrivateBroadcast(CScheduler& scheduler)
{
// Remove stale transactions that are no longer relevant (e.g. already in
// the mempool or mined) and count the remaining ones.
size_t num_for_rebroadcast{0};
const auto stale_txs = m_tx_for_private_broadcast.GetStale();
if (!stale_txs.empty()) {
LOCK(cs_main);
for (const auto& stale_tx : stale_txs) {
auto mempool_acceptable = m_chainman.ProcessTransaction(stale_tx, /*test_accept=*/true);
if (mempool_acceptable.m_result_type == MempoolAcceptResult::ResultType::VALID) {
LogDebug(BCLog::PRIVBROADCAST,
"Reattempting broadcast of stale txid=%s wtxid=%s",
stale_tx->GetHash().ToString(), stale_tx->GetWitnessHash().ToString());
++num_for_rebroadcast;
} else {
LogInfo("[privatebroadcast] Giving up broadcast attempts for txid=%s wtxid=%s: %s",
stale_tx->GetHash().ToString(), stale_tx->GetWitnessHash().ToString(),
mempool_acceptable.m_state.ToString());
m_tx_for_private_broadcast.Remove(stale_tx);
}
}
// This could overshoot, but that is ok - we will open some private connections in vain.
m_connman.m_private_broadcast.NumToOpenAdd(num_for_rebroadcast);
}
const auto delta{2min + FastRandomContext().randrange<std::chrono::milliseconds>(1min)};
scheduler.scheduleFromNow([&] { ReattemptPrivateBroadcast(scheduler); }, delta);
}
void PeerManagerImpl::FinalizeNode(const CNode& node)
{
NodeId nodeid = node.GetId();
@@ -1971,6 +2005,10 @@ void PeerManagerImpl::StartScheduledTasks(CScheduler& scheduler)
// schedule next run for 10-15 minutes in the future
const auto delta = 10min + FastRandomContext().randrange<std::chrono::milliseconds>(5min);
scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta);
if (m_opts.private_broadcast) {
scheduler.scheduleFromNow([&] { ReattemptPrivateBroadcast(scheduler); }, 0min);
}
}
void PeerManagerImpl::ActiveTipChange(const CBlockIndex& new_tip, bool is_ibd)

View File

@@ -90,6 +90,8 @@ public:
//! Number of headers sent in one getheaders message result (this is
//! a test-only option).
uint32_t max_headers_result{MAX_HEADERS_RESULTS};
//! Whether private broadcast is used for sending transactions.
bool private_broadcast{DEFAULT_PRIVATE_BROADCAST};
};
static std::unique_ptr<PeerManager> make(CConnman& connman, AddrMan& addrman,

View File

@@ -23,6 +23,8 @@ void ApplyArgsManOptions(const ArgsManager& argsman, PeerManager::Options& optio
if (auto value{argsman.GetBoolArg("-capturemessages")}) options.capture_messages = *value;
if (auto value{argsman.GetBoolArg("-blocksonly")}) options.ignore_incoming_txs = *value;
if (auto value{argsman.GetBoolArg("-privatebroadcast")}) options.private_broadcast = *value;
}
} // namespace node

View File

@@ -7,6 +7,10 @@
#include <algorithm>
/// If a transaction is not received back from the network for this duration
/// after it is broadcast, then we consider it stale / for rebroadcasting.
static constexpr auto STALE_DURATION{1min};
bool PrivateBroadcast::Add(const CTransactionRef& tx)
EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
@@ -85,6 +89,21 @@ bool PrivateBroadcast::HavePendingTransactions()
return !m_transactions.empty();
}
std::vector<CTransactionRef> PrivateBroadcast::GetStale() const
EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
LOCK(m_mutex);
const auto stale_time{NodeClock::now() - STALE_DURATION};
std::vector<CTransactionRef> stale;
for (const auto& [tx, send_status] : m_transactions) {
const Priority p{DerivePriority(send_status)};
if (p.last_confirmed < stale_time) {
stale.push_back(tx);
}
}
return stale;
}
PrivateBroadcast::Priority PrivateBroadcast::DerivePriority(const std::vector<SendStatus>& sent_to)
{
Priority p;

View File

@@ -89,6 +89,12 @@ public:
bool HavePendingTransactions()
EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
/**
* Get the transactions that have not been broadcast recently.
*/
std::vector<CTransactionRef> GetStale() const
EXCLUSIVE_LOCKS_REQUIRED(!m_mutex);
private:
/// Status of a transaction sent to a given node.
struct SendStatus {