Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions Source/Esendex.TokenBucket.Tests/TokenBucketTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ public void ConsumeWhenTokenAvailable()
_sleepStrategy.Verify(s => s.Sleep(), Times.Never());
}

[Test, Timeout(ConsumeTimeout)]
public void ConsumeAsyncWhenTokenAvailable()
{
_refillStrategy.AddToken();
_bucket.ConsumeAsync().Wait();

_sleepStrategy.Verify(s => s.Sleep(), Times.Never());
}

[Test, Timeout(ConsumeTimeout)]
public void ConsumeWhenTokensAvailable()
{
Expand All @@ -97,6 +106,16 @@ public void ConsumeWhenTokensAvailable()
_sleepStrategy.Verify(s => s.Sleep(), Times.Never());
}

[Test, Timeout(ConsumeTimeout)]
public void ConsumeAsyncWhenTokensAvailable()
{
const int tokensToConsume = 2;
_refillStrategy.AddTokens(tokensToConsume);
_bucket.ConsumeAsync(tokensToConsume).Wait();

_sleepStrategy.Verify(s => s.Sleep(), Times.Never());
}

[Test, Timeout(ConsumeTimeout)]
public void ConsumeWhenTokenUnavailable()
{
Expand All @@ -110,6 +129,19 @@ public void ConsumeWhenTokenUnavailable()
_sleepStrategy.Verify();
}

[Test, Timeout(ConsumeTimeout)]
public void ConsumeAsyncWhenTokenUnavailable()
{
_sleepStrategy
.Setup(s => s.Sleep())
.Callback(_refillStrategy.AddToken)
.Verifiable();

_bucket.ConsumeAsync().Wait();

_sleepStrategy.Verify();
}

[Test, Timeout(ConsumeTimeout)]
public void ConsumeWhenTokensUnavailable()
{
Expand All @@ -124,6 +156,20 @@ public void ConsumeWhenTokensUnavailable()
_sleepStrategy.Verify();
}

[Test, Timeout(ConsumeTimeout)]
public void ConsumeAsyncWhenTokensUnavailable()
{
const int tokensToConsume = 7;
_sleepStrategy
.Setup(s => s.Sleep())
.Callback(() => _refillStrategy.AddTokens(tokensToConsume))
.Verifiable();

_bucket.ConsumeAsync(tokensToConsume).Wait();

_sleepStrategy.Verify();
}

private sealed class MockRefillStrategy : IRefillStrategy
{
private long _numTokensToAdd;
Expand Down
15 changes: 15 additions & 0 deletions Source/Esendex.TokenBucket/ITokenBucket.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Threading.Tasks;

namespace Esendex.TokenBucket
{
/// <summary>
Expand Down Expand Up @@ -29,10 +31,23 @@ public interface ITokenBucket
/// </summary>
void Consume();

/// <summary>
/// Consume a single token from the bucket asynchronously. This method does not block
/// <returns>A task that returns once a token has been consumed</returns>
/// </summary>
Task ConsumeAsync();

/// <summary>
/// Consumes multiple tokens from the bucket. If enough tokens are not currently available then this method will block
/// </summary>
/// <param name="numTokens">The number of tokens to consume from the bucket, must be a positive number.</param>
void Consume(long numTokens);

/// <summary>
/// Consume multiple tokens from the bucket asynchronously. This method does not block
/// <param name="numTokens">The number of tokens to consume from the bucket, must be a positive number.</param>
/// <returns>A task that returns once the requested tokens have been consumed</returns>
/// </summary>
Task ConsumeAsync(long numTokens);
}
}
48 changes: 34 additions & 14 deletions Source/Esendex.TokenBucket/TokenBucket.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;

namespace Esendex.TokenBucket
{
Expand Down Expand Up @@ -72,25 +73,44 @@ public bool TryConsume(long numTokens)
/// <summary>
/// Consume a single token from the bucket. If no token is currently available then this method will block until a
/// token becomes available.
/// </summary>
public void Consume()
{
Consume(1);
/// </summary>
public void Consume()
{
Consume(1);
}

/// <summary>
/// Consume a single token from the bucket asynchronously. This method does not block
/// <returns>A task that returns once a token has been consumed</returns>
/// </summary>
public Task ConsumeAsync()
{
return ConsumeAsync(1);
}

/// <summary>
/// Consumes multiple tokens from the bucket. If enough tokens are not currently available then this method will block
/// </summary>
/// <param name="numTokens">The number of tokens to consume from the bucket, must be a positive number.</param>
public void Consume(long numTokens)
{
while (true) {
if (TryConsume(numTokens)) {
break;
}

_sleepStrategy.Sleep();
}
/// <param name="numTokens">The number of tokens to consume from the bucket, must be a positive number.</param>
public void Consume(long numTokens)
{
while (true) {
if (TryConsume(numTokens)) {
break;
}

_sleepStrategy.Sleep();
}
}

/// <summary>
/// Consume multiple tokens from the bucket asynchronously. This method does not block
/// <param name="numTokens">The number of tokens to consume from the bucket, must be a positive number.</param>
/// <returns>A task that returns once the requested tokens have been consumed</returns>
/// </summary>
public Task ConsumeAsync(long numTokens)
{
return Task.Factory.StartNew(() => Consume(numTokens));
}
}
}