Skip to content

Commit

Permalink
maxdatapoints (#2499)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertbasti authored Aug 2, 2024
1 parent 5f376c0 commit 38c43e0
Show file tree
Hide file tree
Showing 12 changed files with 152 additions and 94 deletions.
10 changes: 10 additions & 0 deletions Src/Witsml/CommonConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,14 @@ public static class Unit
public const string Feet = "ft";
public const string Second = "s";
}

public static class WitsmlQueryTypeName
{
public const string Log = "log";
}

public static class WitsmlFunctionType
{
public const string WMLSUpdateInStore = "WMLS_UpdateInStore";
};
}
37 changes: 36 additions & 1 deletion Src/WitsmlExplorer.Api/Workers/BaseWorker.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.Extensions.Logging;

using Witsml;
using Witsml.Data;
using Witsml.Extensions;

using WitsmlExplorer.Api.Extensions;
using WitsmlExplorer.Api.Jobs;
Expand All @@ -21,19 +24,24 @@ public abstract class BaseWorker<T> where T : Job
protected ILogger<T> Logger { get; }
private IWitsmlClientProvider WitsmlClientProvider { get; }

private WitsmlCapServers _targetServerCapabilities;

public BaseWorker(ILogger<T> logger = null)
{
Logger = logger;
}

public BaseWorker(IWitsmlClientProvider witsmlClientProvider, ILogger<T> logger = null)
{
Logger = logger;
WitsmlClientProvider = witsmlClientProvider;
}

protected IWitsmlClient GetTargetWitsmlClientOrThrow()
{
return WitsmlClientProvider.GetClient() ?? throw new WitsmlClientProviderException($"Missing Target WitsmlClient for {typeof(T)}", (int)HttpStatusCode.Unauthorized, ServerType.Target);
}

protected IWitsmlClient GetSourceWitsmlClientOrThrow()
{
return WitsmlClientProvider.GetSourceClient() ?? throw new WitsmlClientProviderException($"Missing Source WitsmlClient for {typeof(T)}", (int)HttpStatusCode.Unauthorized, ServerType.Source);
Expand All @@ -59,6 +67,33 @@ protected string CancellationReason()
{
return "The job was cancelled by the user.";
}
protected async Task<WitsmlCapServers> GetTargetServerCapabilities()
{
if (_targetServerCapabilities == null)
{
_targetServerCapabilities = await GetTargetWitsmlClientOrThrow().GetCap();
}
return _targetServerCapabilities;
}

protected async Task<int> GetMaxBatchSize(int objectsCount, string functionType, string queryTypeName)
{
var targetServerCapabilities = await GetTargetServerCapabilities();
var serverCapabilites =
targetServerCapabilities.ServerCapabilities;

var functions = serverCapabilites.Select(x => x.Functions.Find(y => y.Name.Equals(functionType)));
var objectCapabilities = functions.Select(x =>
x.DataObjects.Find(y => y.Name.Equals(queryTypeName)));

var maxDataRows = objectCapabilities.FirstOrDefault().MaxDataNodes;
var maxDataPoints = objectCapabilities.FirstOrDefault().MaxDataPoints;

var maxBatchSize =
Math.Min(maxDataRows, maxDataPoints / objectsCount);
return maxBatchSize;
}

public async Task<(Task<(WorkerResult, RefreshAction)>, Job)> SetupWorker(Stream jobStream, CancellationToken? cancellationToken = null)
{
T job = await jobStream.Deserialize<T>();
Expand Down
25 changes: 14 additions & 11 deletions Src/WitsmlExplorer.Api/Workers/Copy/CopyLogDataWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,25 +175,28 @@ private async Task<CopyResult> CopyLogData(WitsmlLog sourceLog, WitsmlLog target

await using LogDataReader logDataReader = new(GetSourceWitsmlClientOrThrow(), sourceLog, mnemonics, Logger);
WitsmlLogData sourceLogData = await logDataReader.GetNextBatch();
var chunkMaxSize = await GetMaxBatchSize(mnemonics.Count, CommonConstants.WitsmlFunctionType.WMLSUpdateInStore, CommonConstants.WitsmlQueryTypeName.Log);

while (sourceLogData != null)
{
var mnemonicList = targetLog.IndexCurve.Value + sourceLogData.MnemonicList[sourceLogData.MnemonicList.IndexOf(CommonConstants.DataSeparator, StringComparison.InvariantCulture)..];
var updateLogDataQueries = LogWorkerTools.GetUpdateLogDataQueries(targetLog.Uid, targetLog.UidWell, targetLog.UidWellbore, sourceLogData, chunkMaxSize, mnemonicList);
if (cancellationToken is { IsCancellationRequested: true })
{
return new CopyResult { Success = false, NumberOfRowsCopied = numberOfDataRowsCopied, ErrorReason = CancellationReason() };
}
WitsmlLogs copyNewCurvesQuery = CreateCopyQuery(targetLog, sourceLogData);
QueryResult result = await RequestUtils.WithRetry(async () => await GetTargetWitsmlClientOrThrow().UpdateInStoreAsync(copyNewCurvesQuery), Logger);
if (result.IsSuccessful)
{
numberOfDataRowsCopied += sourceLogData.Data.Count;
UpdateJobProgress(job, sourceLog, sourceLogData);
}
else
foreach (var query in updateLogDataQueries)
{
Logger.LogError("Failed to copy log data. - {Description} - Current index: {StartIndex}", job.Description(), logDataReader.StartIndex);
return new CopyResult { Success = false, NumberOfRowsCopied = numberOfDataRowsCopied, ErrorReason = result.Reason };
}

var result = await RequestUtils.WithRetry(async () => await GetTargetWitsmlClientOrThrow().UpdateInStoreAsync(query), Logger);
if (!result.IsSuccessful)
{
Logger.LogError("Failed to copy log data. - {Description} - Current index: {StartIndex}", job.Description(), logDataReader.StartIndex);
return new CopyResult { Success = false, NumberOfRowsCopied = numberOfDataRowsCopied, ErrorReason = result.Reason };
}
}
numberOfDataRowsCopied += sourceLogData.Data.Count;
UpdateJobProgress(job, sourceLog, sourceLogData);
sourceLogData = await logDataReader.GetNextBatch();
}

Expand Down
51 changes: 20 additions & 31 deletions Src/WitsmlExplorer.Api/Workers/ImportLogDataWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public class ImportLogDataWorker : BaseWorker<ImportLogDataJob>, IWorker
public ImportLogDataWorker(ILogger<ImportLogDataJob> logger, IWitsmlClientProvider witsmlClientProvider) : base(witsmlClientProvider, logger) { }
public override async Task<(WorkerResult, RefreshAction)> Execute(ImportLogDataJob job, CancellationToken? cancellationToken = null)
{
int chunkSize = 1000;
int maxUpdateAttempts = 2;
string wellUid = job.TargetLog.WellUid;
string wellboreUid = job.TargetLog.WellboreUid;
Expand Down Expand Up @@ -54,8 +53,24 @@ public ImportLogDataWorker(ILogger<ImportLogDataJob> logger, IWitsmlClientProvid
}
}

//Todo: find a way to determine the maximum amount of rows that can be sent to the WITSML server then pass that amount to the CreateImportQueries method
WitsmlLogs[] queries = CreateImportQueries(job, chunkSize).ToArray();
var dataRows = job.DataRows
.Where(d => d.Count() > 1)
.Select(row => new WitsmlData
{
Data = string.Join(CommonConstants.DataSeparator, row)
});

var logData = new WitsmlLogData()
{
Data = dataRows.ToList(),
UnitList = string.Join(CommonConstants.DataSeparator, job.Units)
};
var mnemonicList =
string.Join(CommonConstants.DataSeparator, job.Mnemonics);

var chunkMaxSize = await GetMaxBatchSize(job.Mnemonics.Count, CommonConstants.WitsmlFunctionType.WMLSUpdateInStore, CommonConstants.WitsmlQueryTypeName.Log);

var queries = LogWorkerTools.GetUpdateLogDataQueries(witsmlLog.Uid, witsmlLog.UidWell, witsmlLog.UidWellbore, logData, chunkMaxSize, mnemonicList).ToArray();

for (int i = 0; i < queries.Length; i++)
{
Expand All @@ -73,10 +88,10 @@ public ImportLogDataWorker(ILogger<ImportLogDataJob> logger, IWitsmlClientProvid
job.Description(),
i,
maxUpdateAttempts,
chunkSize,
chunkMaxSize,
queries.Length);

return (new WorkerResult(GetTargetWitsmlClientOrThrow().GetServerHostname(), result.IsSuccessful, $"Failed to import curve data from row: {i * chunkSize}", result.Reason, witsmlLog.GetDescription()), null);
return (new WorkerResult(GetTargetWitsmlClientOrThrow().GetServerHostname(), result.IsSuccessful, $"Failed to import curve data from row: {i * chunkMaxSize}", result.Reason, witsmlLog.GetDescription()), null);
}
}
double progress = (i + 1) / (double)queries.Length;
Expand All @@ -99,32 +114,6 @@ private async Task<WitsmlLog> GetLogHeader(string wellUid, string wellboreUid, s
return result?.Logs.FirstOrDefault();
}

private static IEnumerable<WitsmlLogs> CreateImportQueries(ImportLogDataJob job, int chunkSize)
{
return job.DataRows
.Where(d => d.Count() > 1)
.Select(row => new WitsmlData { Data = string.Join(CommonConstants.DataSeparator, row) })
.Chunk(chunkSize)
.Select(logData => new WitsmlLogs
{
Logs = new List<WitsmlLog>
{
new WitsmlLog
{
Uid = job.TargetLog.Uid,
UidWellbore = job.TargetLog.WellboreUid,
UidWell = job.TargetLog.WellUid,
LogData = new WitsmlLogData
{
Data = logData.ToList(),
MnemonicList = string.Join(CommonConstants.DataSeparator, job.Mnemonics),
UnitList = string.Join(CommonConstants.DataSeparator, job.Units)
}
}
},
});
}

private static WitsmlLogs CreateAddMnemonicsQuery(ImportLogDataJob job, WitsmlLog witsmlLog)
{
return new WitsmlLogs
Expand Down
31 changes: 6 additions & 25 deletions Src/WitsmlExplorer.Api/Workers/OffsetLogCurveWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using Microsoft.Extensions.Logging;

using Witsml;
using Witsml.Data;
using Witsml.Extensions;
using Witsml.ServiceReference;
Expand Down Expand Up @@ -176,7 +177,11 @@ private async Task DeleteLogData(WitsmlLog log, WitsmlLogCurveInfo logCurveInfo,

private async Task UpdateLogData(WitsmlLog log, WitsmlLogCurveInfo logCurveinfo, WitsmlLogData offsetLogData)
{
var queries = GetUpdateLogDataQueries(log, offsetLogData);
var mnemonics = offsetLogData.MnemonicList.Split(CommonConstants.DataSeparator).ToList();
var chunkMaxSize = await GetMaxBatchSize(mnemonics.Count, CommonConstants.WitsmlFunctionType.WMLSUpdateInStore, CommonConstants.WitsmlQueryTypeName.Log);
var mnemonicList = offsetLogData.MnemonicList;

var queries = LogWorkerTools.GetUpdateLogDataQueries(log.Uid, log.UidWell, log.UidWellbore, offsetLogData, chunkMaxSize, mnemonicList);
foreach (var query in queries)
{

Expand All @@ -188,30 +193,6 @@ private async Task UpdateLogData(WitsmlLog log, WitsmlLogCurveInfo logCurveinfo,
}
}

private static List<WitsmlLogs> GetUpdateLogDataQueries(WitsmlLog log, WitsmlLogData offsetLogData)
{
int chunkSize = 5000; // TODO: Base this on maxDataNodes/maxDataPoints once issue #1957 is implemented.
List<WitsmlLogs> batchedQueries = offsetLogData.Data.Chunk(chunkSize).Select(chunk =>
new WitsmlLogs
{
Logs = new WitsmlLog
{
Uid = log.Uid,
UidWell = log.UidWell,
UidWellbore = log.UidWellbore,
LogData = new WitsmlLogData
{
MnemonicList = offsetLogData.MnemonicList,
UnitList = offsetLogData.UnitList,
Data = chunk.ToList(),
}
}.AsItemInList()
}
).ToList();

return batchedQueries;
}

private static WitsmlLogData OffsetLogData(WitsmlLogData logData, double depthOffset, TimeSpan timeOffset, bool isDepthLog)
{
List<WitsmlData> offsetLogData = new();
Expand Down
33 changes: 7 additions & 26 deletions Src/WitsmlExplorer.Api/Workers/SpliceLogsWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -276,35 +276,16 @@ private static List<WitsmlLogCurveInfo> GetNewLogCurveInfo(WitsmlLogs logHeaders

private async Task AddDataToLog(string wellUid, string wellboreUid, string logUid, WitsmlLogData data)
{
var batchSize = 5000; // Use maxDataNodes and maxDataPoints to calculate batchSize when supported by the API.
var dataRows = data.Data;
for (int i = 0; i < dataRows.Count; i += batchSize)
var mnemonics = data.MnemonicList.Split(CommonConstants.DataSeparator).ToList();
var chunkMaxSize = await GetMaxBatchSize(mnemonics.Count, CommonConstants.WitsmlFunctionType.WMLSUpdateInStore, CommonConstants.WitsmlQueryTypeName.Log);
var mnemonicList = data.MnemonicList;
var queries = LogWorkerTools.GetUpdateLogDataQueries(logUid, wellUid, wellboreUid, data, chunkMaxSize, mnemonicList);

foreach (var query in queries)
{
var currentLogData = dataRows.Skip(i).Take(batchSize).ToList();
WitsmlLogs copyNewCurvesQuery = CreateAddLogDataRowsQuery(wellUid, wellboreUid, logUid, data, currentLogData);
QueryResult result = await RequestUtils.WithRetry(async () => await GetTargetWitsmlClientOrThrow().UpdateInStoreAsync(copyNewCurvesQuery), Logger);
QueryResult result = await RequestUtils.WithRetry(async () => await GetTargetWitsmlClientOrThrow().UpdateInStoreAsync(query), Logger);
if (!result.IsSuccessful) throw new ArgumentException($"Could not add log data to the new log. {result.Reason}");
}
}

private static WitsmlLogs CreateAddLogDataRowsQuery(string wellUid, string wellboreUid, string logUid, WitsmlLogData logData, List<WitsmlData> currentLogData)
{
return new()
{
Logs = new List<WitsmlLog> {
new(){
UidWell = wellUid,
UidWellbore = wellboreUid,
Uid = logUid,
LogData = new WitsmlLogData
{
MnemonicList = logData.MnemonicList,
UnitList = logData.UnitList,
Data = currentLogData
}
}
}
};
}
}
}
23 changes: 23 additions & 0 deletions Src/WitsmlExplorer.Api/Workers/Tools/LogWorkerTools.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,28 @@ public static double CalculateProgressBasedOnIndex(WitsmlLog log, WitsmlLogData
return (DateTime.Parse(index) - DateTime.Parse(startIndex)).TotalMilliseconds / (DateTime.Parse(endIndex) - DateTime.Parse(startIndex)).TotalMilliseconds;
}
}

public static List<WitsmlLogs> GetUpdateLogDataQueries(string uid, string uidWell, string uidWellbore, WitsmlLogData logData, int chunkSize, string mnemonicList)
{
List<WitsmlLogs> batchedQueries = logData.Data.Chunk(chunkSize).Select(chunk =>
new WitsmlLogs
{
Logs = new WitsmlLog
{
Uid = uid,
UidWell = uidWell,
UidWellbore = uidWellbore,
LogData = new WitsmlLogData
{
MnemonicList = mnemonicList,
UnitList = logData.UnitList,
Data = chunk.ToList(),
}
}.AsItemInList()
}
).ToList();

return batchedQueries;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public CopyLogDataWorkerTests()
_witsmlClient = new Mock<IWitsmlClient>();
witsmlClientProvider.Setup(provider => provider.GetClient()).Returns(_witsmlClient.Object);
witsmlClientProvider.Setup(provider => provider.GetSourceClient()).Returns(_witsmlClient.Object);
LogUtils.SetUpGetServerCapabilites(_witsmlClient);

Mock<ILogger<CopyLogDataJob>> logger = new();
Mock<IDocumentRepository<Server, Guid>> documentRepository = new();
documentRepository.Setup(client => client.GetDocumentsAsync())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public ImportLogDataWorkerTests()
witsmlClientProvider.Setup(provider => provider.GetClient()).Returns(_witsmlClient.Object);
ILoggerFactory loggerFactory = new LoggerFactory();
loggerFactory.AddSerilog(Log.Logger);
LogUtils.SetUpGetServerCapabilites(_witsmlClient);
ILogger<ImportLogDataJob> logger = loggerFactory.CreateLogger<ImportLogDataJob>();
_worker = new ImportLogDataWorker(logger, witsmlClientProvider.Object);
}
Expand Down
31 changes: 31 additions & 0 deletions Tests/WitsmlExplorer.Api.Tests/Workers/LogUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -413,5 +413,36 @@ public static void SetupGetDepthIndexed(Mock<IWitsmlClient> witsmlClient, Func<W
}.AsItemInList()
});
}

public static void SetUpGetServerCapabilites(Mock<IWitsmlClient> witsmlClient)
{
var serverCapabalities = new WitsmlCapServers()
{
ServerCapabilities = new List<WitsmlServerCapabilities>()
{
new WitsmlServerCapabilities()
{
Functions = new List<WitsmlFunction>()
{
new WitsmlFunction()
{
DataObjects = new List<WitsmlFunctionDataObject>()
{
new WitsmlFunctionDataObject()
{
MaxDataNodes = 10000,
MaxDataPoints = 8000000,
Name = "log"
}
},
Name = "WMLS_UpdateInStore"
}
}
}
}

};
witsmlClient.Setup(client => client.GetCap()).ReturnsAsync(serverCapabalities);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public OffsetLogCurveWorkerTests()
witsmlClientProvider.Setup(provider => provider.GetClient()).Returns(_witsmlClient.Object);
ILoggerFactory loggerFactory = new LoggerFactory();
loggerFactory.AddSerilog(Log.Logger);
LogUtils.SetUpGetServerCapabilites(_witsmlClient);
ILogger<OffsetLogCurveJob> logger = loggerFactory.CreateLogger<OffsetLogCurveJob>();
_worker = new OffsetLogCurveWorker(logger, witsmlClientProvider.Object, null, null);
}
Expand Down
Loading

0 comments on commit 38c43e0

Please sign in to comment.