Skip to content

Commit 854cb97

Browse files
format
1 parent 5ab59a9 commit 854cb97

20 files changed

+920
-50
lines changed

src/Directory.Packages.props

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222
<PackageVersion Include="System.Data.SqlClient" Version="4.8.6" />
2323
<PackageVersion Include="Moq" Version="4.20.70" />
2424
<PackageVersion Include="System.Threading.AccessControl" Version="8.0.0" Condition="'$(TargetFramework)' != 'net462'" />
25+
<PackageVersion Include="Testcontainers" Version="4.6.0" />
2526
<PackageVersion Include="ZooKeeperNetEx" Version="3.4.12.4" />
2627
<PackageVersion Include="IsExternalInit" Version="1.0.3" />
2728
<PackageVersion Include="Microsoft.Bcl.AsyncInterfaces" Version="8.0.0" Condition="'$(TargetFramework)' == 'netstandard2.0' OR '$(TargetFramework)' == 'net462'" />
2829
<PackageVersion Include="System.ValueTuple" Version="4.5.0" Condition="'$(TargetFramework)' == 'net462'" />
2930
<PackageVersion Include="System.Private.Uri" Version="4.3.2" />
31+
<PackageVersion Include="dotnet-etcd" Version="5.2.1" />
3032
</ItemGroup>
3133
</Project>
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFrameworks>netstandard2.1</TargetFrameworks>
5+
<RootNamespace>Medallion.Threading.FileSystem</RootNamespace>
6+
<GenerateDocumentationFile>True</GenerateDocumentationFile>
7+
<WarningLevel>4</WarningLevel>
8+
<LangVersion>Latest</LangVersion>
9+
<Nullable>enable</Nullable>
10+
<ImplicitUsings>enable</ImplicitUsings>
11+
</PropertyGroup>
12+
13+
<PropertyGroup>
14+
<Version>1.0.3</Version>
15+
<AssemblyVersion>1.0.0.0</AssemblyVersion>
16+
<Authors>Michael Adelson</Authors>
17+
<Description>Provides a distributed lock implementation based on etcd</Description>
18+
<Copyright>Copyright © 2020 Michael Adelson</Copyright>
19+
<PackageLicenseExpression>MIT</PackageLicenseExpression>
20+
<PackageTags>distributed etcd lock</PackageTags>
21+
<PackageProjectUrl>https://github.com/madelson/DistributedLock</PackageProjectUrl>
22+
<RepositoryUrl>https://github.com/madelson/DistributedLock</RepositoryUrl>
23+
<FileVersion>1.0.0.0</FileVersion>
24+
<PackageReleaseNotes>See https://github.com/madelson/DistributedLock#release-notes</PackageReleaseNotes>
25+
<SignAssembly>true</SignAssembly>
26+
<AssemblyOriginatorKeyFile>..\DistributedLock.snk</AssemblyOriginatorKeyFile>
27+
</PropertyGroup>
28+
29+
<PropertyGroup Condition="'$(Configuration)' == 'Release'">
30+
<Optimize>True</Optimize>
31+
<GeneratePackageOnBuild>True</GeneratePackageOnBuild>
32+
<TreatWarningsAsErrors>True</TreatWarningsAsErrors>
33+
<TreatSpecificWarningsAsErrors />
34+
<!-- see https://github.com/dotnet/sdk/issues/2679 -->
35+
<DebugType>embedded</DebugType>
36+
<!-- see https://mitchelsellers.com/blog/article/net-5-deterministic-builds-source-linking -->
37+
<ContinuousIntegrationBuild>true</ContinuousIntegrationBuild>
38+
<EmbedUntrackedSources>true</EmbedUntrackedSources>
39+
</PropertyGroup>
40+
41+
<PropertyGroup Condition="'$(Configuration)' == 'Debug'">
42+
<Optimize>False</Optimize>
43+
<NoWarn>1591</NoWarn>
44+
<DefineConstants>TRACE;DEBUG</DefineConstants>
45+
</PropertyGroup>
46+
47+
<ItemGroup>
48+
<ProjectReference Include="..\DistributedLock.Core\DistributedLock.Core.csproj" />
49+
</ItemGroup>
50+
51+
<ItemGroup>
52+
<PackageReference Include="dotnet-etcd" />
53+
<PackageReference Include="Nullable" Condition="'$(TargetFramework)' != 'netstandard2.1'">
54+
<PrivateAssets>all</PrivateAssets>
55+
</PackageReference>
56+
<PackageReference Include="Microsoft.SourceLink.GitHub" PrivateAssets="All"/>
57+
<PackageReference Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" PrivateAssets="All" />
58+
</ItemGroup>
59+
60+
<ItemGroup>
61+
<Using Remove="System.Net.Http"/>
62+
</ItemGroup>
63+
64+
<Import Project="..\FixDistributedLockCoreDependencyVersion.targets" />
65+
</Project>
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
using dotnet_etcd;
2+
using dotnet_etcd.interfaces;
3+
using Etcdserverpb;
4+
using Grpc.Core;
5+
using Medallion.Threading.Internal;
6+
using V3Lockpb;
7+
8+
namespace Medallion.Threading.Etcd;
9+
10+
internal class EtcdClientWrapper
11+
{
12+
private readonly IEtcdClient _etcdClient;
13+
public EtcdClientWrapper(IEtcdClient etcdClient)
14+
{
15+
this._etcdClient = etcdClient ?? throw new ArgumentNullException(nameof(etcdClient));
16+
}
17+
18+
public ValueTask<LeaseGrantResponse> LeaseGrantAsync(LeaseGrantRequest request, CancellationToken cancellationToken)
19+
{
20+
return SyncViaAsync.IsSynchronous
21+
? new ValueTask<LeaseGrantResponse>(this._etcdClient.LeaseGrant(request,
22+
cancellationToken: cancellationToken))
23+
: new ValueTask<LeaseGrantResponse>(
24+
this._etcdClient.LeaseGrantAsync(request, cancellationToken: cancellationToken));
25+
}
26+
27+
public Task LeaseKeepAliveAsync(long leaseId, CancellationToken token)
28+
=> this._etcdClient.LeaseKeepAlive(leaseId, token);
29+
30+
public async ValueTask<LockResponse> LockAsync(LockRequest lockRequest, CancellationToken cancellationToken)
31+
{
32+
var response = SyncViaAsync.IsSynchronous
33+
? this._etcdClient.Lock(lockRequest, cancellationToken: cancellationToken)
34+
: await this._etcdClient.LockAsync(lockRequest, cancellationToken: cancellationToken).ConfigureAwait(false);
35+
36+
if (response == null)
37+
{
38+
throw new RpcException(new Status(StatusCode.Internal, "Lock failed"));
39+
}
40+
41+
return response;
42+
}
43+
44+
public async ValueTask LeaseRevokeAsync(LeaseRevokeRequest leaseRevokeRequest)
45+
{
46+
if (SyncViaAsync.IsSynchronous)
47+
{
48+
this._etcdClient.LeaseRevoke(leaseRevokeRequest);
49+
}
50+
else
51+
{
52+
await this._etcdClient.LeaseRevokeAsync(leaseRevokeRequest).ConfigureAwait(false);
53+
}
54+
}
55+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
using Medallion.Threading.Internal;
2+
3+
namespace Medallion.Threading.Etcd;
4+
5+
public partial class EtcdLeaseDistributedLock
6+
{
7+
// AUTO-GENERATED
8+
9+
IDistributedSynchronizationHandle? IDistributedLock.TryAcquire(TimeSpan timeout, CancellationToken cancellationToken) =>
10+
this.TryAcquire(timeout, cancellationToken);
11+
IDistributedSynchronizationHandle IDistributedLock.Acquire(TimeSpan? timeout, CancellationToken cancellationToken) =>
12+
this.Acquire(timeout, cancellationToken);
13+
ValueTask<IDistributedSynchronizationHandle?> IDistributedLock.TryAcquireAsync(TimeSpan timeout, CancellationToken cancellationToken) =>
14+
this.TryAcquireAsync(timeout, cancellationToken).Convert(To<IDistributedSynchronizationHandle?>.ValueTask);
15+
ValueTask<IDistributedSynchronizationHandle> IDistributedLock.AcquireAsync(TimeSpan? timeout, CancellationToken cancellationToken) =>
16+
this.AcquireAsync(timeout, cancellationToken).Convert(To<IDistributedSynchronizationHandle>.ValueTask);
17+
18+
/// <summary>
19+
/// Attempts to acquire the lock synchronously. Usage:
20+
/// <code>
21+
/// using (var handle = myLock.TryAcquire(...))
22+
/// {
23+
/// if (handle != null) { /* we have the lock! */ }
24+
/// }
25+
/// // dispose releases the lock if we took it
26+
/// </code>
27+
/// </summary>
28+
/// <param name="timeout">How long to wait before giving up on the acquisition attempt. Defaults to 0</param>
29+
/// <param name="cancellationToken">Specifies a token by which the wait can be canceled</param>
30+
/// <returns>An <see cref="EtcdLeaseDistributedLockHandle"/> which can be used to release the lock or null on failure</returns>
31+
public EtcdLeaseDistributedLockHandle? TryAcquire(TimeSpan timeout = default, CancellationToken cancellationToken = default) =>
32+
DistributedLockHelpers.TryAcquire(this, timeout, cancellationToken);
33+
34+
/// <summary>
35+
/// Acquires the lock synchronously, failing with <see cref="TimeoutException"/> if the attempt times out. Usage:
36+
/// <code>
37+
/// using (myLock.Acquire(...))
38+
/// {
39+
/// /* we have the lock! */
40+
/// }
41+
/// // dispose releases the lock
42+
/// </code>
43+
/// </summary>
44+
/// <param name="timeout">How long to wait before giving up on the acquisition attempt. Defaults to <see cref="Timeout.InfiniteTimeSpan"/></param>
45+
/// <param name="cancellationToken">Specifies a token by which the wait can be canceled</param>
46+
/// <returns>An <see cref="EtcdLeaseDistributedLockHandle"/> which can be used to release the lock</returns>
47+
public EtcdLeaseDistributedLockHandle Acquire(TimeSpan? timeout = null, CancellationToken cancellationToken = default) =>
48+
DistributedLockHelpers.Acquire(this, timeout, cancellationToken);
49+
50+
/// <summary>
51+
/// Attempts to acquire the lock asynchronously. Usage:
52+
/// <code>
53+
/// await using (var handle = await myLock.TryAcquireAsync(...))
54+
/// {
55+
/// if (handle != null) { /* we have the lock! */ }
56+
/// }
57+
/// // dispose releases the lock if we took it
58+
/// </code>
59+
/// </summary>
60+
/// <param name="timeout">How long to wait before giving up on the acquisition attempt. Defaults to 0</param>
61+
/// <param name="cancellationToken">Specifies a token by which the wait can be canceled</param>
62+
/// <returns>An <see cref="EtcdLeaseDistributedLockHandle"/> which can be used to release the lock or null on failure</returns>
63+
public ValueTask<EtcdLeaseDistributedLockHandle?> TryAcquireAsync(TimeSpan timeout = default, CancellationToken cancellationToken = default) =>
64+
this.As<IInternalDistributedLock<EtcdLeaseDistributedLockHandle>>().InternalTryAcquireAsync(timeout, cancellationToken);
65+
66+
/// <summary>
67+
/// Acquires the lock asynchronously, failing with <see cref="TimeoutException"/> if the attempt times out. Usage:
68+
/// <code>
69+
/// await using (await myLock.AcquireAsync(...))
70+
/// {
71+
/// /* we have the lock! */
72+
/// }
73+
/// // dispose releases the lock
74+
/// </code>
75+
/// </summary>
76+
/// <param name="timeout">How long to wait before giving up on the acquisition attempt. Defaults to <see cref="Timeout.InfiniteTimeSpan"/></param>
77+
/// <param name="cancellationToken">Specifies a token by which the wait can be canceled</param>
78+
/// <returns>An <see cref="EtcdLeaseDistributedLockHandle"/> which can be used to release the lock</returns>
79+
public ValueTask<EtcdLeaseDistributedLockHandle> AcquireAsync(TimeSpan? timeout = null, CancellationToken cancellationToken = default) =>
80+
DistributedLockHelpers.AcquireAsync(this, timeout, cancellationToken);
81+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
using dotnet_etcd.interfaces;
2+
using Etcdserverpb;
3+
using Medallion.Threading.Internal;
4+
using V3Lockpb;
5+
6+
namespace Medallion.Threading.Etcd;
7+
8+
/// <summary>
9+
/// A distributed lock based on holding an exclusive handle to a lock file. The file will be deleted when the lock is released.
10+
/// </summary>
11+
public sealed partial class EtcdLeaseDistributedLock : IInternalDistributedLock<EtcdLeaseDistributedLockHandle>
12+
{
13+
private readonly EtcdClientWrapper _etcdClient;
14+
15+
private readonly (TimeoutValue duration, TimeoutValue renewalCadence, TimeoutValue minBusyWaitSleepTime,
16+
TimeoutValue maxBusyWaitSleepTime) _options;
17+
18+
public EtcdLeaseDistributedLock(IEtcdClient client, string lockName,
19+
Action<EtcdLeaseOptionsBuilder>? options = null)
20+
{
21+
// TODO must support all non-null lock name via getsafename
22+
this.Name = lockName ?? throw new ArgumentNullException(nameof(lockName));
23+
if (lockName.Length == 0) { throw new FormatException($"{nameof(lockName)}: may not have an empty file name"); }
24+
25+
this._etcdClient = new EtcdClientWrapper(client);
26+
this._options = EtcdLeaseOptionsBuilder.GetOptions(options);
27+
}
28+
29+
// todo revisit API
30+
/// <summary>
31+
/// Implements <see cref="IDistributedLock.Name"/>
32+
/// </summary>
33+
public string Name { get; }
34+
35+
36+
ValueTask<EtcdLeaseDistributedLockHandle?> IInternalDistributedLock<EtcdLeaseDistributedLockHandle>.
37+
InternalTryAcquireAsync(TimeoutValue timeout, CancellationToken cancellationToken) =>
38+
BusyWaitHelper.WaitAsync(
39+
state: this,
40+
tryGetValue: (@this, token) => @this.TryAcquireAsync(token),
41+
timeout: timeout,
42+
minSleepTime: this._options.minBusyWaitSleepTime,
43+
maxSleepTime: this._options.maxBusyWaitSleepTime,
44+
cancellationToken
45+
);
46+
47+
private async ValueTask<EtcdLeaseDistributedLockHandle?> TryAcquireAsync(CancellationToken cancellationToken)
48+
{
49+
// TODO implement LeaseHandle
50+
cancellationToken.ThrowIfCancellationRequested();
51+
// TODO renewnal cadence should not be here
52+
var leaseResponse = await this._etcdClient.LeaseGrantAsync(
53+
new LeaseGrantRequest { TTL = this._options.renewalCadence.InSeconds },
54+
cancellationToken: cancellationToken).ConfigureAwait(false);
55+
var leaseId = leaseResponse.ID;
56+
var cancellationTokenSource = new CancellationTokenSource();
57+
_ = this._etcdClient.LeaseKeepAliveAsync(leaseId, cancellationTokenSource.Token).ConfigureAwait(false);
58+
var response =
59+
await this._etcdClient.LockAsync(
60+
new LockRequest { Name = Google.Protobuf.ByteString.CopyFromUtf8(this.Name), Lease = leaseId, },
61+
cancellationToken: cancellationTokenSource.Token).ConfigureAwait(false);
62+
var actualKey = response.Key;
63+
return new EtcdLeaseDistributedLockHandle(actualKey.ToString(), this._etcdClient, leaseId);
64+
}
65+
66+
67+
private static string GetSafeName(string name)
68+
{
69+
// TODO figure
70+
return DistributedLockHelpers.ToSafeName(name, 1000, s => s);
71+
}
72+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using Etcdserverpb;
2+
using Medallion.Threading.Internal;
3+
4+
namespace Medallion.Threading.Etcd;
5+
6+
public sealed class EtcdLeaseDistributedLockHandle : IDistributedSynchronizationHandle
7+
{
8+
private readonly string _key;
9+
private readonly long _leaseKey;
10+
private readonly EtcdClientWrapper _client;
11+
12+
internal EtcdLeaseDistributedLockHandle(string key, EtcdClientWrapper client, long leaseKey)
13+
{
14+
this._key = key ?? throw new ArgumentNullException(nameof(key));
15+
this._client = client ?? throw new ArgumentNullException(nameof(client));
16+
this._leaseKey = leaseKey;
17+
// Because this is a lease, managed finalization mostly won't be strictly necessary here. Where it comes in handy is:
18+
// (1) Ensuring blob deletion if we own the blob
19+
// (2) Helping release infinite-duration leases (rare case)
20+
// (3) In testing, avoiding having to wait 15+ seconds for lease expiration
21+
}
22+
23+
24+
/// <summary>
25+
/// Implements <see cref="IDistributedSynchronizationHandle.HandleLostToken"/>
26+
/// TODO implement HandleLostToken
27+
/// </summary>
28+
public CancellationToken HandleLostToken => new();
29+
30+
31+
/// <summary>
32+
/// Releases the lock
33+
/// </summary>
34+
public void Dispose() => this.DisposeSyncViaAsync();
35+
36+
/// <summary>
37+
/// Releases the lock asynchronously
38+
/// </summary>
39+
public ValueTask DisposeAsync()
40+
{
41+
//TODO lease revoke will die here
42+
return this._client.LeaseRevokeAsync(new LeaseRevokeRequest { ID = this._leaseKey });
43+
}
44+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
using dotnet_etcd.interfaces;
2+
3+
namespace Medallion.Threading.Etcd;
4+
5+
/// <summary>
6+
/// Implements <see cref="IDistributedLockProvider"/> for <see cref="AzureBlobLeaseDistributedLock"/>
7+
/// </summary>
8+
public sealed class EtcdLeaseDistributedLockProvider : IDistributedLockProvider
9+
{
10+
private readonly IEtcdClient _blobContainerClient;
11+
private readonly Action<EtcdLeaseOptionsBuilder>? _options;
12+
13+
/// <summary>
14+
/// Constructs a provider that scopes blobs within the provided <paramref name="blobContainerClient"/> and uses the provided <paramref name="options"/>.
15+
/// </summary>
16+
public EtcdLeaseDistributedLockProvider(IEtcdClient blobContainerClient,
17+
Action<EtcdLeaseOptionsBuilder>? options = null)
18+
{
19+
this._blobContainerClient = blobContainerClient ?? throw new ArgumentNullException(nameof(blobContainerClient));
20+
this._options = options;
21+
}
22+
23+
/// <summary>
24+
/// Constructs an <see cref="AzureBlobLeaseDistributedLock"/> with the given <paramref name="name"/>.
25+
/// </summary>
26+
public EtcdLeaseDistributedLock CreateLock(string name) => new(this._blobContainerClient, name, this._options);
27+
28+
IDistributedLock IDistributedLockProvider.CreateLock(string name) => this.CreateLock(name);
29+
}

0 commit comments

Comments
 (0)