mirror of
https://github.com/bitcoin/bitcoin.git
synced 2026-02-09 02:59:31 +08:00
Merge bitcoin/bitcoin#32466: threading: drop CSemaphore in favor of c++20 std::counting_semaphore
6f7052a7b9threading: semaphore: move CountingSemaphoreGrant to its own header (Cory Fields)fd15469892threading: semaphore: remove temporary convenience types (Cory Fields)1f89e2a49ascripted-diff: threading: semaphore: use direct types rather than the temporary convenience ones (Cory Fields)f21365c4fcthreading: replace CountingSemaphore with std::counting_semaphore (Cory Fields)1acacfbad7threading: make CountingSemaphore/CountingSemaphoreGrant template types (Cory Fields)e6ce5f9e78scripted-diff: rename CSemaphore and CSemaphoreGrant (Cory Fields)793166d381wallet: change the write semaphore to a BinarySemaphore (Cory Fields)6790ad27f1scripted-diff: rename CSemaphoreGrant and CSemaphore for net (Cory Fields)d870bc9451threading: add temporary semaphore aliases (Cory Fields)7b816c4e00threading: rename CSemaphore methods to match std::semaphore (Cory Fields) Pull request description: This is relatively simple, but done in a bunch of commits to enable scripted diffs. I wanted to add a semaphore in a branch I've been working on, but it was unclear if I should use `std::counting_semaphore` or stick with our old `CSemaphore`. I couldn't decide, so I just decided to remove all doubt and get rid of ours :) This replaces our old `CSemaphore` with `std::counting_semaphore` everywhere we used it. `CSemaphoreGrant` is still there as an RAII wrapper, but is now called `CountingSemaphoreGrant` and `BinarySemaphoreGrant` to match. Those have been moved out of `sync.h` to their own file. ACKs for top commit: purpleKarrot: ACK6f7052a7b9achow101: ACK6f7052a7b9TheCharlatan: ACK6f7052a7b9hebasto: ACK6f7052a7b9, I have reviewed the code and it looks OK. Tree-SHA512: 5975d13aa21739174e3a22c544620ae3f36345f172b51612346d3b7baf0a07c39ef6fd54f647c87878c21a67951b347a5d4a5f90e897f3f6c0db360a3779d0df
This commit is contained in:
20
src/net.cpp
20
src/net.cpp
@@ -1886,7 +1886,7 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
|
||||
if (max_connections != std::nullopt && existing_connections >= max_connections) return false;
|
||||
|
||||
// Max total outbound connections already exist
|
||||
CSemaphoreGrant grant(*semOutbound, true);
|
||||
CountingSemaphoreGrant<> grant(*semOutbound, true);
|
||||
if (!grant) return false;
|
||||
|
||||
OpenNetworkConnection(CAddress(), false, std::move(grant), address.c_str(), conn_type, /*use_v2transport=*/use_v2transport);
|
||||
@@ -2402,7 +2402,7 @@ void CConnman::ProcessAddrFetch()
|
||||
// peer doesn't support it or immediately disconnects us for another reason.
|
||||
const bool use_v2transport(GetLocalServices() & NODE_P2P_V2);
|
||||
CAddress addr;
|
||||
CSemaphoreGrant grant(*semOutbound, /*fTry=*/true);
|
||||
CountingSemaphoreGrant<> grant(*semOutbound, /*fTry=*/true);
|
||||
if (grant) {
|
||||
OpenNetworkConnection(addr, false, std::move(grant), strDest.c_str(), ConnectionType::ADDR_FETCH, use_v2transport);
|
||||
}
|
||||
@@ -2576,7 +2576,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, std
|
||||
|
||||
PerformReconnections();
|
||||
|
||||
CSemaphoreGrant grant(*semOutbound);
|
||||
CountingSemaphoreGrant<> grant(*semOutbound);
|
||||
if (interruptNet)
|
||||
return;
|
||||
|
||||
@@ -2954,7 +2954,7 @@ void CConnman::ThreadOpenAddedConnections()
|
||||
AssertLockNotHeld(m_reconnections_mutex);
|
||||
while (true)
|
||||
{
|
||||
CSemaphoreGrant grant(*semAddnode);
|
||||
CountingSemaphoreGrant<> grant(*semAddnode);
|
||||
std::vector<AddedNodeInfo> vInfo = GetAddedNodeInfo(/*include_connected=*/false);
|
||||
bool tried = false;
|
||||
for (const AddedNodeInfo& info : vInfo) {
|
||||
@@ -2967,7 +2967,7 @@ void CConnman::ThreadOpenAddedConnections()
|
||||
CAddress addr(CService(), NODE_NONE);
|
||||
OpenNetworkConnection(addr, false, std::move(grant), info.m_params.m_added_node.c_str(), ConnectionType::MANUAL, info.m_params.m_use_v2transport);
|
||||
if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) return;
|
||||
grant = CSemaphoreGrant(*semAddnode, /*fTry=*/true);
|
||||
grant = CountingSemaphoreGrant<>(*semAddnode, /*fTry=*/true);
|
||||
}
|
||||
// See if any reconnections are desired.
|
||||
PerformReconnections();
|
||||
@@ -2978,7 +2978,7 @@ void CConnman::ThreadOpenAddedConnections()
|
||||
}
|
||||
|
||||
// if successful, this moves the passed grant to the constructed node
|
||||
void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound, const char *pszDest, ConnectionType conn_type, bool use_v2transport)
|
||||
void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CountingSemaphoreGrant<>&& grant_outbound, const char *pszDest, ConnectionType conn_type, bool use_v2transport)
|
||||
{
|
||||
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
|
||||
assert(conn_type != ConnectionType::INBOUND);
|
||||
@@ -3329,11 +3329,11 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions)
|
||||
|
||||
if (semOutbound == nullptr) {
|
||||
// initialize semaphore
|
||||
semOutbound = std::make_unique<CSemaphore>(std::min(m_max_automatic_outbound, m_max_automatic_connections));
|
||||
semOutbound = std::make_unique<std::counting_semaphore<>>(std::min(m_max_automatic_outbound, m_max_automatic_connections));
|
||||
}
|
||||
if (semAddnode == nullptr) {
|
||||
// initialize semaphore
|
||||
semAddnode = std::make_unique<CSemaphore>(m_max_addnode);
|
||||
semAddnode = std::make_unique<std::counting_semaphore<>>(m_max_addnode);
|
||||
}
|
||||
|
||||
//
|
||||
@@ -3421,13 +3421,13 @@ void CConnman::Interrupt()
|
||||
|
||||
if (semOutbound) {
|
||||
for (int i=0; i<m_max_automatic_outbound; i++) {
|
||||
semOutbound->post();
|
||||
semOutbound->release();
|
||||
}
|
||||
}
|
||||
|
||||
if (semAddnode) {
|
||||
for (int i=0; i<m_max_addnode; i++) {
|
||||
semAddnode->post();
|
||||
semAddnode->release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
11
src/net.h
11
src/net.h
@@ -24,6 +24,7 @@
|
||||
#include <policy/feerate.h>
|
||||
#include <protocol.h>
|
||||
#include <random.h>
|
||||
#include <semaphore_grant.h>
|
||||
#include <span.h>
|
||||
#include <streams.h>
|
||||
#include <sync.h>
|
||||
@@ -729,7 +730,7 @@ public:
|
||||
// Setting fDisconnect to true will cause the node to be disconnected the
|
||||
// next time DisconnectNodes() runs
|
||||
std::atomic_bool fDisconnect{false};
|
||||
CSemaphoreGrant grantOutbound;
|
||||
CountingSemaphoreGrant<> grantOutbound;
|
||||
std::atomic<int> nRefCount{0};
|
||||
|
||||
const uint64_t nKeyedNetGroup;
|
||||
@@ -1136,7 +1137,7 @@ public:
|
||||
bool GetNetworkActive() const { return fNetworkActive; };
|
||||
bool GetUseAddrmanOutgoing() const { return m_use_addrman_outgoing; };
|
||||
void SetNetworkActive(bool active);
|
||||
void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant&& grant_outbound, const char* strDest, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
|
||||
void OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CountingSemaphoreGrant<>&& grant_outbound, const char* strDest, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
|
||||
bool CheckIncomingNonce(uint64_t nonce);
|
||||
void ASMapHealthCheck();
|
||||
|
||||
@@ -1491,8 +1492,8 @@ private:
|
||||
*/
|
||||
std::atomic<ServiceFlags> m_local_services;
|
||||
|
||||
std::unique_ptr<CSemaphore> semOutbound;
|
||||
std::unique_ptr<CSemaphore> semAddnode;
|
||||
std::unique_ptr<std::counting_semaphore<>> semOutbound;
|
||||
std::unique_ptr<std::counting_semaphore<>> semAddnode;
|
||||
|
||||
/**
|
||||
* Maximum number of automatic connections permitted, excluding manual
|
||||
@@ -1614,7 +1615,7 @@ private:
|
||||
struct ReconnectionInfo
|
||||
{
|
||||
CAddress addr_connect;
|
||||
CSemaphoreGrant grant;
|
||||
CountingSemaphoreGrant<> grant;
|
||||
std::string destination;
|
||||
ConnectionType conn_type;
|
||||
bool use_v2transport;
|
||||
|
||||
93
src/semaphore_grant.h
Normal file
93
src/semaphore_grant.h
Normal file
@@ -0,0 +1,93 @@
|
||||
// Copyright (c) 2009-2010 Satoshi Nakamoto
|
||||
// Copyright (c) 2009-present The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#ifndef BITCOIN_SEMAPHORE_GRANT_H
|
||||
#define BITCOIN_SEMAPHORE_GRANT_H
|
||||
|
||||
#include <semaphore>
|
||||
|
||||
/** RAII-style semaphore lock */
|
||||
template <std::ptrdiff_t LeastMaxValue = std::counting_semaphore<>::max()>
|
||||
class CountingSemaphoreGrant
|
||||
{
|
||||
private:
|
||||
std::counting_semaphore<LeastMaxValue>* sem;
|
||||
bool fHaveGrant;
|
||||
|
||||
public:
|
||||
void Acquire() noexcept
|
||||
{
|
||||
if (fHaveGrant) {
|
||||
return;
|
||||
}
|
||||
sem->acquire();
|
||||
fHaveGrant = true;
|
||||
}
|
||||
|
||||
void Release() noexcept
|
||||
{
|
||||
if (!fHaveGrant) {
|
||||
return;
|
||||
}
|
||||
sem->release();
|
||||
fHaveGrant = false;
|
||||
}
|
||||
|
||||
bool TryAcquire() noexcept
|
||||
{
|
||||
if (!fHaveGrant && sem->try_acquire()) {
|
||||
fHaveGrant = true;
|
||||
}
|
||||
return fHaveGrant;
|
||||
}
|
||||
|
||||
// Disallow copy.
|
||||
CountingSemaphoreGrant(const CountingSemaphoreGrant&) = delete;
|
||||
CountingSemaphoreGrant& operator=(const CountingSemaphoreGrant&) = delete;
|
||||
|
||||
// Allow move.
|
||||
CountingSemaphoreGrant(CountingSemaphoreGrant&& other) noexcept
|
||||
{
|
||||
sem = other.sem;
|
||||
fHaveGrant = other.fHaveGrant;
|
||||
other.fHaveGrant = false;
|
||||
other.sem = nullptr;
|
||||
}
|
||||
|
||||
CountingSemaphoreGrant& operator=(CountingSemaphoreGrant&& other) noexcept
|
||||
{
|
||||
Release();
|
||||
sem = other.sem;
|
||||
fHaveGrant = other.fHaveGrant;
|
||||
other.fHaveGrant = false;
|
||||
other.sem = nullptr;
|
||||
return *this;
|
||||
}
|
||||
|
||||
CountingSemaphoreGrant() noexcept : sem(nullptr), fHaveGrant(false) {}
|
||||
|
||||
explicit CountingSemaphoreGrant(std::counting_semaphore<LeastMaxValue>& sema, bool fTry = false) noexcept : sem(&sema), fHaveGrant(false)
|
||||
{
|
||||
if (fTry) {
|
||||
TryAcquire();
|
||||
} else {
|
||||
Acquire();
|
||||
}
|
||||
}
|
||||
|
||||
~CountingSemaphoreGrant()
|
||||
{
|
||||
Release();
|
||||
}
|
||||
|
||||
explicit operator bool() const noexcept
|
||||
{
|
||||
return fHaveGrant;
|
||||
}
|
||||
};
|
||||
|
||||
using BinarySemaphoreGrant = CountingSemaphoreGrant<1>;
|
||||
|
||||
#endif // BITCOIN_SEMAPHORE_GRANT_H
|
||||
127
src/sync.h
127
src/sync.h
@@ -300,131 +300,4 @@ inline MutexType* MaybeCheckNotHeld(MutexType* m) LOCKS_EXCLUDED(m) LOCK_RETURNE
|
||||
//! gcc and the -Wreturn-stack-address flag in clang, both enabled by default.
|
||||
#define WITH_LOCK(cs, code) (MaybeCheckNotHeld(cs), [&]() -> decltype(auto) { LOCK(cs); code; }())
|
||||
|
||||
/** An implementation of a semaphore.
|
||||
*
|
||||
* See https://en.wikipedia.org/wiki/Semaphore_(programming)
|
||||
*/
|
||||
class CSemaphore
|
||||
{
|
||||
private:
|
||||
std::condition_variable condition;
|
||||
std::mutex mutex;
|
||||
int value;
|
||||
|
||||
public:
|
||||
explicit CSemaphore(int init) noexcept : value(init) {}
|
||||
|
||||
// Disallow default construct, copy, move.
|
||||
CSemaphore() = delete;
|
||||
CSemaphore(const CSemaphore&) = delete;
|
||||
CSemaphore(CSemaphore&&) = delete;
|
||||
CSemaphore& operator=(const CSemaphore&) = delete;
|
||||
CSemaphore& operator=(CSemaphore&&) = delete;
|
||||
|
||||
void wait() noexcept
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
condition.wait(lock, [&]() { return value >= 1; });
|
||||
value--;
|
||||
}
|
||||
|
||||
bool try_wait() noexcept
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
if (value < 1) {
|
||||
return false;
|
||||
}
|
||||
value--;
|
||||
return true;
|
||||
}
|
||||
|
||||
void post() noexcept
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
value++;
|
||||
}
|
||||
condition.notify_one();
|
||||
}
|
||||
};
|
||||
|
||||
/** RAII-style semaphore lock */
|
||||
class CSemaphoreGrant
|
||||
{
|
||||
private:
|
||||
CSemaphore* sem;
|
||||
bool fHaveGrant;
|
||||
|
||||
public:
|
||||
void Acquire() noexcept
|
||||
{
|
||||
if (fHaveGrant) {
|
||||
return;
|
||||
}
|
||||
sem->wait();
|
||||
fHaveGrant = true;
|
||||
}
|
||||
|
||||
void Release() noexcept
|
||||
{
|
||||
if (!fHaveGrant) {
|
||||
return;
|
||||
}
|
||||
sem->post();
|
||||
fHaveGrant = false;
|
||||
}
|
||||
|
||||
bool TryAcquire() noexcept
|
||||
{
|
||||
if (!fHaveGrant && sem->try_wait()) {
|
||||
fHaveGrant = true;
|
||||
}
|
||||
return fHaveGrant;
|
||||
}
|
||||
|
||||
// Disallow copy.
|
||||
CSemaphoreGrant(const CSemaphoreGrant&) = delete;
|
||||
CSemaphoreGrant& operator=(const CSemaphoreGrant&) = delete;
|
||||
|
||||
// Allow move.
|
||||
CSemaphoreGrant(CSemaphoreGrant&& other) noexcept
|
||||
{
|
||||
sem = other.sem;
|
||||
fHaveGrant = other.fHaveGrant;
|
||||
other.fHaveGrant = false;
|
||||
other.sem = nullptr;
|
||||
}
|
||||
|
||||
CSemaphoreGrant& operator=(CSemaphoreGrant&& other) noexcept
|
||||
{
|
||||
Release();
|
||||
sem = other.sem;
|
||||
fHaveGrant = other.fHaveGrant;
|
||||
other.fHaveGrant = false;
|
||||
other.sem = nullptr;
|
||||
return *this;
|
||||
}
|
||||
|
||||
CSemaphoreGrant() noexcept : sem(nullptr), fHaveGrant(false) {}
|
||||
|
||||
explicit CSemaphoreGrant(CSemaphore& sema, bool fTry = false) noexcept : sem(&sema), fHaveGrant(false)
|
||||
{
|
||||
if (fTry) {
|
||||
TryAcquire();
|
||||
} else {
|
||||
Acquire();
|
||||
}
|
||||
}
|
||||
|
||||
~CSemaphoreGrant()
|
||||
{
|
||||
Release();
|
||||
}
|
||||
|
||||
explicit operator bool() const noexcept
|
||||
{
|
||||
return fHaveGrant;
|
||||
}
|
||||
};
|
||||
|
||||
#endif // BITCOIN_SYNC_H
|
||||
|
||||
@@ -445,7 +445,7 @@ void SQLiteBatch::Close()
|
||||
try {
|
||||
m_database.Open();
|
||||
// If TxnAbort failed and we refreshed the connection, the semaphore was not released, so release it here to avoid deadlocks on future writes.
|
||||
m_database.m_write_semaphore.post();
|
||||
m_database.m_write_semaphore.release();
|
||||
} catch (const std::runtime_error&) {
|
||||
// If open fails, cleanup this object and rethrow the exception
|
||||
m_database.Close();
|
||||
@@ -498,7 +498,7 @@ bool SQLiteBatch::WriteKey(DataStream&& key, DataStream&& value, bool overwrite)
|
||||
if (!BindBlobToStatement(stmt, 2, value, "value")) return false;
|
||||
|
||||
// Acquire semaphore if not previously acquired when creating a transaction.
|
||||
if (!m_txn) m_database.m_write_semaphore.wait();
|
||||
if (!m_txn) m_database.m_write_semaphore.acquire();
|
||||
|
||||
// Execute
|
||||
int res = sqlite3_step(stmt);
|
||||
@@ -508,7 +508,7 @@ bool SQLiteBatch::WriteKey(DataStream&& key, DataStream&& value, bool overwrite)
|
||||
LogPrintf("%s: Unable to execute statement: %s\n", __func__, sqlite3_errstr(res));
|
||||
}
|
||||
|
||||
if (!m_txn) m_database.m_write_semaphore.post();
|
||||
if (!m_txn) m_database.m_write_semaphore.release();
|
||||
|
||||
return res == SQLITE_DONE;
|
||||
}
|
||||
@@ -522,7 +522,7 @@ bool SQLiteBatch::ExecStatement(sqlite3_stmt* stmt, std::span<const std::byte> b
|
||||
if (!BindBlobToStatement(stmt, 1, blob, "key")) return false;
|
||||
|
||||
// Acquire semaphore if not previously acquired when creating a transaction.
|
||||
if (!m_txn) m_database.m_write_semaphore.wait();
|
||||
if (!m_txn) m_database.m_write_semaphore.acquire();
|
||||
|
||||
// Execute
|
||||
int res = sqlite3_step(stmt);
|
||||
@@ -532,7 +532,7 @@ bool SQLiteBatch::ExecStatement(sqlite3_stmt* stmt, std::span<const std::byte> b
|
||||
LogPrintf("%s: Unable to execute statement: %s\n", __func__, sqlite3_errstr(res));
|
||||
}
|
||||
|
||||
if (!m_txn) m_database.m_write_semaphore.post();
|
||||
if (!m_txn) m_database.m_write_semaphore.release();
|
||||
|
||||
return res == SQLITE_DONE;
|
||||
}
|
||||
@@ -651,12 +651,12 @@ std::unique_ptr<DatabaseCursor> SQLiteBatch::GetNewPrefixCursor(std::span<const
|
||||
bool SQLiteBatch::TxnBegin()
|
||||
{
|
||||
if (!m_database.m_db || m_txn) return false;
|
||||
m_database.m_write_semaphore.wait();
|
||||
m_database.m_write_semaphore.acquire();
|
||||
Assert(!m_database.HasActiveTxn());
|
||||
int res = Assert(m_exec_handler)->Exec(m_database, "BEGIN TRANSACTION");
|
||||
if (res != SQLITE_OK) {
|
||||
LogPrintf("SQLiteBatch: Failed to begin the transaction\n");
|
||||
m_database.m_write_semaphore.post();
|
||||
m_database.m_write_semaphore.release();
|
||||
} else {
|
||||
m_txn = true;
|
||||
}
|
||||
@@ -672,7 +672,7 @@ bool SQLiteBatch::TxnCommit()
|
||||
LogPrintf("SQLiteBatch: Failed to commit the transaction\n");
|
||||
} else {
|
||||
m_txn = false;
|
||||
m_database.m_write_semaphore.post();
|
||||
m_database.m_write_semaphore.release();
|
||||
}
|
||||
return res == SQLITE_OK;
|
||||
}
|
||||
@@ -686,7 +686,7 @@ bool SQLiteBatch::TxnAbort()
|
||||
LogPrintf("SQLiteBatch: Failed to abort the transaction\n");
|
||||
} else {
|
||||
m_txn = false;
|
||||
m_database.m_write_semaphore.post();
|
||||
m_database.m_write_semaphore.release();
|
||||
}
|
||||
return res == SQLITE_OK;
|
||||
}
|
||||
|
||||
@@ -8,6 +8,8 @@
|
||||
#include <sync.h>
|
||||
#include <wallet/db.h>
|
||||
|
||||
#include <semaphore>
|
||||
|
||||
struct bilingual_str;
|
||||
|
||||
struct sqlite3_stmt;
|
||||
@@ -127,7 +129,7 @@ public:
|
||||
|
||||
// Batches must acquire this semaphore on writing, and release when done writing.
|
||||
// This ensures that only one batch is modifying the database at a time.
|
||||
CSemaphore m_write_semaphore;
|
||||
std::binary_semaphore m_write_semaphore;
|
||||
|
||||
bool Verify(bilingual_str& error);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user