Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to new Storage API's #30

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
17 changes: 10 additions & 7 deletions src/Foundatio.AzureStorage/Extensions/StorageExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
using System;
using Foundatio.Storage;
using Microsoft.Azure.Storage.Blob;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs;

namespace Foundatio.Azure.Extensions {
public static class StorageExtensions {
public static FileSpec ToFileInfo(this CloudBlockBlob blob) {
if (blob.Properties.Length == -1)
public static FileSpec ToFileInfo(this BlobProperties blob, string name) {
if (blob.ContentLength == -1)
return null;

return new FileSpec {
Path = blob.Name,
Size = blob.Properties.Length,
Created = blob.Properties.LastModified?.UtcDateTime ?? DateTime.MinValue,
Modified = blob.Properties.LastModified?.UtcDateTime ?? DateTime.MinValue
Path = name,
Size = blob.ContentLength,
Created = blob.CreatedOn.UtcDateTime,
Modified = blob.LastModified.UtcDateTime
};
}


}
}
4 changes: 2 additions & 2 deletions src/Foundatio.AzureStorage/Foundatio.AzureStorage.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
<PackageTags>Queue;Messaging;Message;File;Distributed;Storage;Blob;Azure</PackageTags>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Azure.Storage.Blobs" Version="12.7.0" />
<PackageReference Include="Azure.Storage.Queues" Version="12.5.0" />
<PackageReference Include="Foundatio" Version="10.0.0" Condition="'$(ReferenceFoundatioSource)' == '' OR '$(ReferenceFoundatioSource)' == 'false'" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.0" />
<PackageReference Include="Microsoft.Azure.Storage.Queue" Version="11.0" />

<ProjectReference Include="..\..\..\Foundatio\src\Foundatio\Foundatio.csproj" Condition="'$(ReferenceFoundatioSource)' == 'true'" />
</ItemGroup>
Expand Down
84 changes: 45 additions & 39 deletions src/Foundatio.AzureStorage/Queues/AzureStorageQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
using Foundatio.Serializer;
using Foundatio.Utility;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Queue;
using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;
using LogLevel = Microsoft.Extensions.Logging.LogLevel;

namespace Foundatio.Queues {
public class AzureStorageQueue<T> : QueueBase<T, AzureStorageQueueOptions<T>> where T : class {
private readonly AsyncLock _lock = new AsyncLock();
private readonly CloudQueue _queueReference;
private readonly CloudQueue _deadletterQueueReference;
private readonly QueueClient _queueReference;
private readonly QueueClient _deadletterQueueReference;
private long _enqueuedCount;
private long _dequeuedCount;
private long _completedCount;
Expand All @@ -27,14 +27,20 @@ public class AzureStorageQueue<T> : QueueBase<T, AzureStorageQueueOptions<T>> wh
public AzureStorageQueue(AzureStorageQueueOptions<T> options) : base(options) {
if (String.IsNullOrEmpty(options.ConnectionString))
throw new ArgumentException("ConnectionString is required.");

var account = CloudStorageAccount.Parse(options.ConnectionString);
var client = account.CreateCloudQueueClient();
if (options.RetryPolicy != null)
client.DefaultRequestOptions.RetryPolicy = options.RetryPolicy;

_queueReference = client.GetQueueReference(_options.Name);
_deadletterQueueReference = client.GetQueueReference($"{_options.Name}-poison");
// properties going in the queueservice client ( should match with IRetryPolicy of v11)
// with exponential mode ( RetryOptions.Delay vs retryDelay , RetryOptons.MaxRetris vs retries of v11)
var queueClientOptions = new QueueClientOptions {
Retry = {
MaxRetries = options.Retries, //The maximum number of retry attempts before giving up
Delay = options.Delay, //The delay between retry attempts for a fixed approach or the delay on which to base
Mode = options.RetryMode
}
};
var queueServiceClient = new QueueServiceClient(options.ConnectionString, queueClientOptions);

_queueReference = queueServiceClient.GetQueueClient(_options.Name);
_deadletterQueueReference = queueServiceClient.GetQueueClient($"{_options.Name}-poison");
}

public AzureStorageQueue(Builder<AzureStorageQueueOptionsBuilder<T>, AzureStorageQueueOptions<T>> config)
Expand All @@ -49,9 +55,11 @@ protected override async Task EnsureQueueCreatedAsync(CancellationToken cancella
return;

var sw = Stopwatch.StartNew();
var qTask = _queueReference.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
var dTask = _deadletterQueueReference.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
await Task.WhenAll(
_queueReference.CreateIfNotExistsAsync(),
_deadletterQueueReference.CreateIfNotExistsAsync()
qTask,
dTask
).AnyContext();
_queueCreated = true;

Expand All @@ -65,25 +73,26 @@ protected override async Task<string> EnqueueImplAsync(T data, QueueEntryOptions
return null;

Interlocked.Increment(ref _enqueuedCount);
var message = new CloudQueueMessage(_serializer.SerializeToBytes(data));
await _queueReference.AddMessageAsync(message).AnyContext();
var body = _serializer.SerializeToBytes(data);
var binaryData = new BinaryData(body);
SendReceipt result = await _queueReference.SendMessageAsync(binaryData).AnyContext();

var entry = new QueueEntry<T>(message.Id, null, data, this, SystemClock.UtcNow, 0);
var entry = new QueueEntry<T>(result.MessageId, null, data, this, SystemClock.UtcNow, 0);
await OnEnqueuedAsync(entry).AnyContext();

return message.Id;
return result.MessageId;
}

protected override async Task<IQueueEntry<T>> DequeueImplAsync(CancellationToken linkedCancellationToken) {
var message = await _queueReference.GetMessageAsync(_options.WorkItemTimeout, null, null, !linkedCancellationToken.IsCancellationRequested ? linkedCancellationToken : CancellationToken.None).AnyContext();
QueueMessage[] receivedMessage = await _queueReference.ReceiveMessagesAsync(null, _options.WorkItemTimeout, !linkedCancellationToken.IsCancellationRequested ? linkedCancellationToken : CancellationToken.None).AnyContext();
bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace);

if (message == null) {
if (receivedMessage != null && receivedMessage.Length == 0) {
var sw = Stopwatch.StartNew();
var lastReport = DateTime.Now;
if (isTraceLogLevelEnabled) _logger.LogTrace("No message available to dequeue, waiting...");

while (message == null && !linkedCancellationToken.IsCancellationRequested) {
while (receivedMessage != null && receivedMessage.Length == 0 && !linkedCancellationToken.IsCancellationRequested) {
if (isTraceLogLevelEnabled && DateTime.Now.Subtract(lastReport) > TimeSpan.FromSeconds(10))
_logger.LogTrace("Still waiting for message to dequeue: {Elapsed:g}", sw.Elapsed);

Expand All @@ -92,30 +101,31 @@ protected override async Task<IQueueEntry<T>> DequeueImplAsync(CancellationToken
await SystemClock.SleepAsync(_options.DequeueInterval, linkedCancellationToken).AnyContext();
} catch (OperationCanceledException) { }

message = await _queueReference.GetMessageAsync(_options.WorkItemTimeout, null, null, !linkedCancellationToken.IsCancellationRequested ? linkedCancellationToken : CancellationToken.None).AnyContext();
receivedMessage = await _queueReference.ReceiveMessagesAsync(null, _options.WorkItemTimeout, !linkedCancellationToken.IsCancellationRequested ? linkedCancellationToken : CancellationToken.None).AnyContext();
}

sw.Stop();
if (isTraceLogLevelEnabled) _logger.LogTrace("Waited to dequeue message: {Elapsed:g}", sw.Elapsed);
}

if (message == null) {
if (receivedMessage != null && receivedMessage.Length == 0) {
if (isTraceLogLevelEnabled) _logger.LogTrace("No message was dequeued.");
return null;
}

if (isTraceLogLevelEnabled) _logger.LogTrace("Dequeued message {Id}", message.Id);
if (isTraceLogLevelEnabled) _logger.LogTrace("Dequeued message {Id}", receivedMessage[0].MessageId);
Interlocked.Increment(ref _dequeuedCount);
var data = _serializer.Deserialize<T>(message.AsBytes);
var entry = new AzureStorageQueueEntry<T>(message, data, this);
var data = _serializer.Deserialize<T>(receivedMessage[0].Body.ToArray());
var entry = new AzureStorageQueueEntry<T>(receivedMessage[0], data, this);
await OnDequeuedAsync(entry).AnyContext();
return entry;
}

public override async Task RenewLockAsync(IQueueEntry<T> entry) {
if (_logger.IsEnabled(LogLevel.Debug)) _logger.LogDebug("Queue {Name} renew lock item: {EntryId}", _options.Name, entry.Id);
var azureQueueEntry = ToAzureEntryWithCheck(entry);
await _queueReference.UpdateMessageAsync(azureQueueEntry.UnderlyingMessage, _options.WorkItemTimeout, MessageUpdateFields.Visibility).AnyContext();
await _queueReference.UpdateMessageAsync(azureQueueEntry.UnderlyingMessage.MessageId, azureQueueEntry.UnderlyingMessage.PopReceipt, azureQueueEntry.UnderlyingMessage.Body, TimeSpan.Zero).AnyContext();

await OnLockRenewedAsync(entry).AnyContext();
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Renew lock done: {EntryId}", entry.Id);
}
Expand All @@ -126,7 +136,7 @@ public override async Task CompleteAsync(IQueueEntry<T> entry) {
throw new InvalidOperationException("Queue entry has already been completed or abandoned.");

var azureQueueEntry = ToAzureEntryWithCheck(entry);
await _queueReference.DeleteMessageAsync(azureQueueEntry.UnderlyingMessage).AnyContext();
await _queueReference.DeleteMessageAsync(azureQueueEntry.UnderlyingMessage.MessageId, azureQueueEntry.UnderlyingMessage.PopReceipt).AnyContext();

Interlocked.Increment(ref _completedCount);
entry.MarkCompleted();
Expand All @@ -142,12 +152,12 @@ public override async Task AbandonAsync(IQueueEntry<T> entry) {
var azureQueueEntry = ToAzureEntryWithCheck(entry);
if (azureQueueEntry.Attempts > _options.Retries) {
await Task.WhenAll(
_queueReference.DeleteMessageAsync(azureQueueEntry.UnderlyingMessage),
_deadletterQueueReference.AddMessageAsync(azureQueueEntry.UnderlyingMessage)
_queueReference.DeleteMessageAsync(azureQueueEntry.UnderlyingMessage.MessageId, azureQueueEntry.UnderlyingMessage.PopReceipt),
_deadletterQueueReference.SendMessageAsync(azureQueueEntry.UnderlyingMessage.Body)
).AnyContext();
} else {
// Make the item visible immediately
await _queueReference.UpdateMessageAsync(azureQueueEntry.UnderlyingMessage, TimeSpan.Zero, MessageUpdateFields.Visibility).AnyContext();
await _queueReference.UpdateMessageAsync(azureQueueEntry.UnderlyingMessage.MessageId, azureQueueEntry.UnderlyingMessage.PopReceipt, azureQueueEntry.UnderlyingMessage.Body, TimeSpan.Zero).AnyContext();
}

Interlocked.Increment(ref _abandonedCount);
Expand All @@ -162,18 +172,14 @@ protected override Task<IEnumerable<T>> GetDeadletterItemsImplAsync(Cancellation

protected override async Task<QueueStats> GetQueueStatsImplAsync() {
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Fetching stats.");
var sw = Stopwatch.StartNew();
await Task.WhenAll(
_queueReference.FetchAttributesAsync(),
_deadletterQueueReference.FetchAttributesAsync()
).AnyContext();
sw.Stop();
if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Fetching stats took {Elapsed:g}.", sw.Elapsed);

QueueProperties queuedMessageCount = await _queueReference.GetPropertiesAsync();
QueueProperties deadLetterQueueMessageCount = await _deadletterQueueReference.GetPropertiesAsync();

return new QueueStats {
Queued = _queueReference.ApproximateMessageCount.GetValueOrDefault(),
Queued = queuedMessageCount.ApproximateMessagesCount,
Working = 0,
Deadletter = _deadletterQueueReference.ApproximateMessageCount.GetValueOrDefault(),
Deadletter = deadLetterQueueMessageCount.ApproximateMessagesCount,
Enqueued = _enqueuedCount,
Dequeued = _dequeuedCount,
Completed = _completedCount,
Expand Down
8 changes: 4 additions & 4 deletions src/Foundatio.AzureStorage/Queues/AzureStorageQueueEntry.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
using System;
using Microsoft.Azure.Storage.Queue;
using Azure.Storage.Queues.Models;

namespace Foundatio.Queues {
public class AzureStorageQueueEntry<T> : QueueEntry<T> where T : class {
public CloudQueueMessage UnderlyingMessage { get; }
public QueueMessage UnderlyingMessage { get; }

public AzureStorageQueueEntry(CloudQueueMessage message, T value, IQueue<T> queue)
: base(message.Id, null, value, queue, message.InsertionTime.GetValueOrDefault().UtcDateTime, message.DequeueCount) {
public AzureStorageQueueEntry(QueueMessage message, T value, IQueue<T> queue)
: base(message.MessageId, null, value, queue, message.InsertedOn.GetValueOrDefault().UtcDateTime, (int)message.DequeueCount) {

UnderlyingMessage = message;
}
Expand Down
21 changes: 15 additions & 6 deletions src/Foundatio.AzureStorage/Queues/AzureStorageQueueOptions.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
using System;
using Microsoft.Azure.Storage.RetryPolicies;
using Azure.Core;

namespace Foundatio.Queues {
public class AzureStorageQueueOptions<T> : SharedQueueOptions<T> where T : class {
public string ConnectionString { get; set; }
public IRetryPolicy RetryPolicy { get; set; }
public TimeSpan DequeueInterval { get; set; } = TimeSpan.FromSeconds(2);
public RetryMode RetryMode { get; set; }

// The delay between retry attempts for a fixed approach or the delay on which to base calculations for a backoff-based approach.
public TimeSpan Delay { get; set; }
}

public class AzureStorageQueueOptionsBuilder<T> : SharedQueueOptionsBuilder<T, AzureStorageQueueOptions<T>, AzureStorageQueueOptionsBuilder<T>> where T: class {
Expand All @@ -14,14 +17,20 @@ public AzureStorageQueueOptionsBuilder<T> ConnectionString(string connectionStri
return this;
}

public AzureStorageQueueOptionsBuilder<T> RetryPolicy(IRetryPolicy retryPolicy) {
Target.RetryPolicy = retryPolicy ?? throw new ArgumentNullException(nameof(retryPolicy));
public AzureStorageQueueOptionsBuilder<T> DequeueInterval(TimeSpan dequeueInterval) {
Target.DequeueInterval = dequeueInterval;
return this;
}

public AzureStorageQueueOptionsBuilder<T> DequeueInterval(TimeSpan dequeueInterval) {
Target.DequeueInterval = dequeueInterval;
public AzureStorageQueueOptionsBuilder<T> RetryMode(RetryMode retryMode) {
Target.RetryMode = retryMode;
return this;
}

public AzureStorageQueueOptionsBuilder<T> RetryDelay(TimeSpan retryDelay) {
Target.Delay = retryDelay;
return this;
}

}
}
Loading