Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ AC_ARG_ENABLE([threadsafety],
AS_IF([test "x$enable_threadsafety" = "xyes"], [
AC_MSG_NOTICE([enabling thread safety static analysis, see https://clang.llvm.org/docs/ThreadSafetyAnalysis.html])
AS_CASE(["$CXX"],
[clang*], [CXXFLAGS="$CXXFLAGS -Wthread-safety -Werror=thread-safety -DTHREAD_SAFETY"],
[clang*], [CXXFLAGS="$CXXFLAGS -Wthread-safety -Werror=thread-safety -Wthread-safety-beta -Werror=thread-safety-beta -DTHREAD_SAFETY"],
Copy link
Contributor Author

@marta-lokhova marta-lokhova Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A slightly annoying discovery - some features like ACQUIRED_BEFORE/ACQUIRED_AFTER are not actually implemented in clang20 (as per llvm/llvm-project#51788). These are implemented in thread-safety-beta though, and newer clang versions properly promote them stable (see llvm/llvm-project#152853). Either way, I think it doesn't hurt to enable thread-safety-beta for more coverage.

[AC_MSG_WARN([Thread safety analysis is only supported with Clang compiler, skipping])])
])

Expand Down
2 changes: 1 addition & 1 deletion src/bucket/BucketManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class BucketManager : NonMovableOrCopyable
// mLedgerStateMutex to prevent deadlocks. Code must NOT hold mBucketMutex
// while trying to acquire LedgerManagerImpl::mLedgerStateMutex, as this
// will cause a deadlock.
mutable RecursiveMutex mBucketMutex;
ANNOTATED_RECURSIVE_MUTEX(mBucketMutex);

#ifdef THREAD_SAFETY
private:
Expand Down
2 changes: 1 addition & 1 deletion src/bucket/BucketSnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class BucketSnapshotManager : NonMovableOrCopyable
AppConnector& mAppConnector;

// Lock must be held when accessing any member variables holding snapshots
mutable SharedMutex mSnapshotMutex;
ANNOTATED_SHARED_MUTEX(mSnapshotMutex);

// Snapshot that is maintained and periodically updated by BucketManager on
// the main thread. When background threads need to generate or refresh a
Expand Down
14 changes: 7 additions & 7 deletions src/bucket/LiveBucketIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ LiveBucketIndex::maybeInitializeCache(size_t totalBucketListAccountsSizeBytes,
}

// Cache is already initialized
if (std::shared_lock<std::shared_mutex> lock(mCacheMutex); mCache)
if (SharedLockShared lock(mCacheMutex); mCache)
{
return;
}
Expand All @@ -123,7 +123,7 @@ LiveBucketIndex::maybeInitializeCache(size_t totalBucketListAccountsSizeBytes,
return;
}

std::unique_lock<std::shared_mutex> lock(mCacheMutex);
SharedLockExclusive lock(mCacheMutex);
if (totalBucketListAccountsSizeBytes < maxBucketListBytesToCache)
{
// We can cache the entire bucket
Expand Down Expand Up @@ -202,7 +202,7 @@ LiveBucketIndex::getCachedEntry(LedgerKey const& k) const
{
if (shouldUseCache() && isCachedType(k))
{
std::shared_lock<std::shared_mutex> lock(mCacheMutex);
SharedLockShared lock(mCacheMutex);
auto cachePtr = mCache->maybeGet(k);
if (cachePtr)
{
Expand Down Expand Up @@ -323,7 +323,7 @@ LiveBucketIndex::shouldUseCache() const
{
if (mDiskIndex)
{
std::shared_lock<std::shared_mutex> lock(mCacheMutex);
SharedLockShared lock(mCacheMutex);
return mCache != nullptr;
}

Expand Down Expand Up @@ -353,7 +353,7 @@ LiveBucketIndex::maybeAddToCache(
// earlier.
mCacheMissMeter.Mark();

std::unique_lock<std::shared_mutex> lock(mCacheMutex);
SharedLockExclusive lock(mCacheMutex);
mCache->put(k, entry);
}
}
Expand Down Expand Up @@ -392,7 +392,7 @@ LiveBucketIndex::getMaxCacheSize() const
{
if (shouldUseCache())
{
std::shared_lock<std::shared_mutex> lock(mCacheMutex);
SharedLockShared lock(mCacheMutex);
return mCache->maxSize();
}

Expand All @@ -405,7 +405,7 @@ LiveBucketIndex::getCurrentCacheSize() const
{
if (shouldUseCache())
{
std::shared_lock<std::shared_mutex> lock(mCacheMutex);
SharedLockShared lock(mCacheMutex);
return mCache->size();
}

Expand Down
6 changes: 3 additions & 3 deletions src/bucket/LiveBucketIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
#include "ledger/LedgerHashUtils.h" // IWYU pragma: keep
#include "util/NonCopyable.h"
#include "util/RandomEvictionCache.h"
#include "util/ThreadAnnotations.h"
#include "util/XDROperators.h" // IWYU pragma: keep
#include "xdr/Stellar-ledger-entries.h"
#include <filesystem>
#include <optional>

#include <cereal/archives/binary.hpp>
#include <shared_mutex>

namespace asio
{
Expand Down Expand Up @@ -65,8 +65,8 @@ class LiveBucketIndex : public NonMovableOrCopyable
// The indexes themselves are thread safe, as they are immutable after
// construction. The cache is not, all accesses must first acquire this
// mutex.
mutable std::unique_ptr<CacheT> mCache{};
mutable std::shared_mutex mCacheMutex;
mutable std::unique_ptr<CacheT> mCache GUARDED_BY(mCacheMutex){};
ANNOTATED_SHARED_MUTEX(mCacheMutex);

medida::Meter& mCacheHitMeter;
medida::Meter& mCacheMissMeter;
Expand Down
2 changes: 1 addition & 1 deletion src/invariant/InvariantManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class InvariantManagerImpl : public InvariantManager
std::string lastFailedWithMessage;
};

Mutex mutable mFailureInformationMutex;
ANNOTATED_MUTEX(mFailureInformationMutex);
std::map<std::string, InvariantFailureInformation>
mFailureInformation GUARDED_BY(mFailureInformationMutex);

Expand Down
8 changes: 3 additions & 5 deletions src/ledger/LedgerManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,8 @@ class LedgerManagerImpl : public LedgerManager
VirtualClock::time_point mLastClose;

// Use mutex to guard ledger state during apply
mutable RecursiveMutex mLedgerStateMutex
#ifdef THREAD_SAFETY
ACQUIRED_BEFORE(BucketManager::mBucketMutex)
#endif
;
ANNOTATED_RECURSIVE_MUTEX(mLedgerStateMutex,
ACQUIRED_BEFORE(BucketManager::mBucketMutex));

medida::Timer& mCatchupDuration;

Expand Down Expand Up @@ -564,6 +561,7 @@ class LedgerManagerImpl : public LedgerManager
virtual bool
isApplying() const override
{
releaseAssert(threadIsMain());
return mCurrentlyApplyingLedger;
}
void markApplyStateReset() override;
Expand Down
2 changes: 1 addition & 1 deletion src/overlay/FlowControl.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class FlowControl
size_t mTxQueueByteCount GUARDED_BY(mFlowControlMutex){0};

// Mutex to synchronize flow control state
Mutex mutable mFlowControlMutex;
ANNOTATED_MUTEX(mFlowControlMutex);
// Is this peer currently throttled due to lack of capacity
std::optional<VirtualClock::time_point>
mLastThrottle GUARDED_BY(mFlowControlMutex);
Expand Down
1 change: 1 addition & 0 deletions src/overlay/Hmac.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ Hmac::setAuthenticatedMessageBody(AuthenticatedMessage& aMsg,
void
Hmac::damageRecvMacKey()
{
LOCK_GUARD(mMutex, guard);
auto bytes = randomBytes(mRecvMacKey.key.size());
std::copy(bytes.begin(), bytes.end(), mRecvMacKey.key.begin());
}
Expand Down
21 changes: 13 additions & 8 deletions src/overlay/Hmac.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,22 @@

using namespace stellar;

namespace stellar
{
class Peer;
}

class Hmac
{
#ifndef USE_TRACY
Mutex mMutex;
#else
TracyLockable(std::mutex, mMutex);
#ifdef THREAD_SAFETY
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This friend declaration is not intuitive, a short comment would be nice.

friend class stellar::Peer;
#endif
HmacSha256Key mSendMacKey;
HmacSha256Key mRecvMacKey;
uint64_t mSendMacSeq{0};
uint64_t mRecvMacSeq{0};

ANNOTATED_MUTEX(mMutex);
HmacSha256Key mSendMacKey GUARDED_BY(mMutex);
HmacSha256Key mRecvMacKey GUARDED_BY(mMutex);
uint64_t mSendMacSeq GUARDED_BY(mMutex){0};
uint64_t mRecvMacSeq GUARDED_BY(mMutex){0};

public:
bool setSendMackey(HmacSha256Key const& key);
Expand Down
9 changes: 3 additions & 6 deletions src/overlay/Peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,9 @@ class Peer : public std::enable_shared_from_this<Peer>,
#endif

// Mutex to protect PeerState, which can be accessed and modified from
// multiple threads
#ifndef USE_TRACY
RecursiveMutex mutable mStateMutex;
#else
mutable TracyLockable(std::recursive_mutex, mStateMutex);
#endif
// multiple threads.
// LOCK ORDERING: mStateMutex must be acquired before Hmac::mMutex.
ANNOTATED_RECURSIVE_MUTEX(mStateMutex, ACQUIRED_BEFORE(Hmac::mMutex));

Hmac mHmac;
// Does local node have capacity to read from this peer
Expand Down
24 changes: 12 additions & 12 deletions src/process/ProcessManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class ProcessExitEvent::Impl
size_t
ProcessManagerImpl::getNumRunningProcesses()
{
std::lock_guard<std::recursive_mutex> guard(mProcessesMutex);
RECURSIVE_LOCK_GUARD(mProcessesMutex, guard);
size_t n = 0;
for (auto const& pe : mProcesses)
{
Expand All @@ -212,7 +212,7 @@ ProcessManagerImpl::getNumRunningProcesses()
size_t
ProcessManagerImpl::getNumRunningOrShuttingDownProcesses()
{
std::lock_guard<std::recursive_mutex> guard(mProcessesMutex);
RECURSIVE_LOCK_GUARD(mProcessesMutex, guard);
return mProcesses.size();
}

Expand Down Expand Up @@ -262,7 +262,7 @@ ProcessManagerImpl::isShutdown() const
void
ProcessManagerImpl::shutdown()
{
std::lock_guard<std::recursive_mutex> guard(mProcessesMutex);
RECURSIVE_LOCK_GUARD(mProcessesMutex, guard);
if (!mIsShutdown)
{
mIsShutdown = true;
Expand All @@ -281,7 +281,7 @@ ProcessManagerImpl::shutdown()
void
ProcessManagerImpl::tryProcessShutdownAll()
{
std::lock_guard<std::recursive_mutex> guard(mProcessesMutex);
RECURSIVE_LOCK_GUARD(mProcessesMutex, guard);
for (auto const& pe : mProcesses)
{
tryProcessShutdown(pe.second);
Expand All @@ -292,7 +292,7 @@ bool
ProcessManagerImpl::tryProcessShutdown(std::shared_ptr<ProcessExitEvent> pe)
{
ZoneScoped;
std::lock_guard<std::recursive_mutex> guard(mProcessesMutex);
RECURSIVE_LOCK_GUARD(mProcessesMutex, guard);
checkInvariants();

if (!pe)
Expand Down Expand Up @@ -364,7 +364,7 @@ asio::error_code
ProcessManagerImpl::handleProcessTermination(int pid, int status)
{
ZoneScoped;
std::lock_guard<std::recursive_mutex> guard(mProcessesMutex);
RECURSIVE_LOCK_GUARD(mProcessesMutex, guard);
checkInvariants();

auto pair = mProcesses.find(pid);
Expand Down Expand Up @@ -668,14 +668,14 @@ ProcessManagerImpl::ProcessManagerImpl(Application& app)
, mTmpDir(
std::make_unique<TmpDir>(app.getTmpDirManager().tmpDir("process")))
{
std::lock_guard<std::recursive_mutex> guard(mProcessesMutex);
RECURSIVE_LOCK_GUARD(mProcessesMutex, guard);
startWaitingForSignalChild();
}

void
ProcessManagerImpl::startWaitingForSignalChild()
{
std::lock_guard<std::recursive_mutex> guard(mProcessesMutex);
RECURSIVE_LOCK_GUARD(mProcessesMutex, guard);
mSigChild.async_wait(
std::bind(&ProcessManagerImpl::handleSignalChild, this));
}
Expand All @@ -696,7 +696,7 @@ ProcessManagerImpl::reapChildren()
{
// Store tuples (pid, status)
std::vector<std::tuple<int, int>> signaledChildren;
std::lock_guard<std::recursive_mutex> guard(mProcessesMutex);
RECURSIVE_LOCK_GUARD(mProcessesMutex, guard);
for (auto const& pair : mProcesses)
{
int const pid = pair.first;
Expand Down Expand Up @@ -840,7 +840,7 @@ std::weak_ptr<ProcessExitEvent>
ProcessManagerImpl::runProcess(std::string const& cmdLine, std::string outFile)
{
ZoneScoped;
std::lock_guard<std::recursive_mutex> guard(mProcessesMutex);
RECURSIVE_LOCK_GUARD(mProcessesMutex, guard);
auto pe =
std::shared_ptr<ProcessExitEvent>(new ProcessExitEvent(mIOContext));

Expand All @@ -867,7 +867,7 @@ ProcessManagerImpl::maybeRunPendingProcesses()
{
return;
}
std::lock_guard<std::recursive_mutex> guard(mProcessesMutex);
RECURSIVE_LOCK_GUARD(mProcessesMutex, guard);
while (!mPending.empty() &&
getNumRunningOrShuttingDownProcesses() < mMaxProcesses)
{
Expand Down Expand Up @@ -905,7 +905,7 @@ ProcessManagerImpl::maybeRunPendingProcesses()
void
ProcessManagerImpl::checkInvariants()
{
std::lock_guard<std::recursive_mutex> guard(mProcessesMutex);
RECURSIVE_LOCK_GUARD(mProcessesMutex, guard);
if (mIsShutdown)
{
releaseAssertOrThrow(mPending.empty());
Expand Down
11 changes: 7 additions & 4 deletions src/process/ProcessManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#pragma once

#include "process/ProcessManager.h"
#include "util/ThreadAnnotations.h"
#include "util/TmpDir.h"
#include <atomic>
#include <deque>
Expand All @@ -18,22 +19,24 @@ class ProcessManagerImpl : public ProcessManager
{
// Subprocesses will be removed asynchronously, hence the lock on
// just the mProcesses member.
std::recursive_mutex mProcessesMutex;
ANNOTATED_RECURSIVE_MUTEX(mProcessesMutex);

// Stores a map from pid to running-or-shutting-down processes.
// Any ProcessExitEvent should be stored either in mProcesses
// or in mPending (before it's launched).
std::map<int, std::shared_ptr<ProcessExitEvent>> mProcesses;
std::map<int, std::shared_ptr<ProcessExitEvent>>
mProcesses GUARDED_BY(mProcessesMutex);

bool mIsShutdown{false};
std::atomic<bool> mIsShutdown{false};
size_t const mMaxProcesses;
asio::io_context& mIOContext;
// These are only used on POSIX, but they're harmless here.
asio::signal_set mSigChild;
std::unique_ptr<TmpDir> mTmpDir;
uint64_t mTempFileCount{0};

std::deque<std::shared_ptr<ProcessExitEvent>> mPending;
std::deque<std::shared_ptr<ProcessExitEvent>>
mPending GUARDED_BY(mProcessesMutex);
void maybeRunPendingProcesses();
void checkInvariants();

Expand Down
5 changes: 0 additions & 5 deletions src/util/GlobalChecks.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,8 @@ void dbgAbort();

#endif

#ifndef USE_TRACY
using RecursiveLockGuard = RecursiveMutexLocker;
using LockGuard = MutexLocker;
#else
using RecursiveLockGuard = std::lock_guard<LockableBase(std::recursive_mutex)>;
using LockGuard = std::lock_guard<LockableBase(std::mutex)>;
#endif
#define RECURSIVE_LOCK_GUARD(mutex_, guardName) \
RecursiveLockGuard guardName(mutex_)
#define LOCK_GUARD(mutex_, guardName) LockGuard guardName(mutex_)
Expand Down
2 changes: 1 addition & 1 deletion src/util/MetricsRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace stellar
// Wrapper around medida::MetricsRegistry to support registering `SimpleTimer`s
class MetricsRegistry : public medida::MetricsRegistry
{
Mutex mLock;
ANNOTATED_MUTEX(mLock);
// Note that it is safe to hand out references to this map because values
// have pointer stability.
std::map<SimpleTimerName, SimpleTimer> mSimpleTimers GUARDED_BY(mLock);
Expand Down
3 changes: 2 additions & 1 deletion src/util/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#pragma once

#include <atomic>
#include <chrono>
#include <functional>
#include <list>
Expand Down Expand Up @@ -183,7 +184,7 @@ class Scheduler
// or run.
size_t mSize{0};

bool mIsShutdown{false};
std::atomic<bool> mIsShutdown{false};

void trimSingleActionQueue(Qptr q,
std::chrono::steady_clock::time_point now);
Expand Down
Loading
Loading