Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

maxdatapoints #2499

Merged
merged 15 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Src/Witsml/CommonConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,14 @@ public static class Unit
public const string Feet = "ft";
public const string Second = "s";
}

public static class WitsmlQueryTypeName
{
public const string Log = "log";
}

public static class WitsmlFunctionType
{
public const string WMLSUpdateInStore = "WMLS_UpdateInStore";
};
}
37 changes: 36 additions & 1 deletion Src/WitsmlExplorer.Api/Workers/BaseWorker.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.Extensions.Logging;

using Witsml;
using Witsml.Data;
using Witsml.Extensions;

using WitsmlExplorer.Api.Extensions;
using WitsmlExplorer.Api.Jobs;
Expand All @@ -21,19 +24,24 @@ public abstract class BaseWorker<T> where T : Job
protected ILogger<T> Logger { get; }
private IWitsmlClientProvider WitsmlClientProvider { get; }

private WitsmlCapServers _targetServerCapabilities;

public BaseWorker(ILogger<T> logger = null)
{
Logger = logger;
}

public BaseWorker(IWitsmlClientProvider witsmlClientProvider, ILogger<T> logger = null)
{
Logger = logger;
WitsmlClientProvider = witsmlClientProvider;
}

protected IWitsmlClient GetTargetWitsmlClientOrThrow()
{
return WitsmlClientProvider.GetClient() ?? throw new WitsmlClientProviderException($"Missing Target WitsmlClient for {typeof(T)}", (int)HttpStatusCode.Unauthorized, ServerType.Target);
}

protected IWitsmlClient GetSourceWitsmlClientOrThrow()
{
return WitsmlClientProvider.GetSourceClient() ?? throw new WitsmlClientProviderException($"Missing Source WitsmlClient for {typeof(T)}", (int)HttpStatusCode.Unauthorized, ServerType.Source);
Expand All @@ -59,6 +67,33 @@ protected string CancellationReason()
{
return "The job was cancelled by the user.";
}
protected async Task<WitsmlCapServers> GetTargetServerCapabilities()
{
if (_targetServerCapabilities == null)
{
_targetServerCapabilities = await GetTargetWitsmlClientOrThrow().GetCap();
}
return _targetServerCapabilities;
}

protected async Task<int> GetMaxBatchSize(int objectsCount, string functionType, string queryTypeName)
{
var targetServerCapabilities = await GetTargetServerCapabilities();
var serverCapabilites =
targetServerCapabilities.ServerCapabilities;

var functions = serverCapabilites.Select(x => x.Functions.Find(y => y.Name.Equals(functionType)));
var objectCapabilities = functions.Select(x =>
x.DataObjects.Find(y => y.Name.Equals(queryTypeName)));

var maxDataRows = objectCapabilities.FirstOrDefault().MaxDataNodes;
var maxDataPoints = objectCapabilities.FirstOrDefault().MaxDataPoints;

var maxBatchSize =
Math.Min(maxDataRows, maxDataPoints / objectsCount);
return maxBatchSize;
}

public async Task<(Task<(WorkerResult, RefreshAction)>, Job)> SetupWorker(Stream jobStream, CancellationToken? cancellationToken = null)
{
T job = await jobStream.Deserialize<T>();
Expand Down
25 changes: 14 additions & 11 deletions Src/WitsmlExplorer.Api/Workers/Copy/CopyLogDataWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,25 +175,28 @@ private async Task<CopyResult> CopyLogData(WitsmlLog sourceLog, WitsmlLog target

await using LogDataReader logDataReader = new(GetSourceWitsmlClientOrThrow(), sourceLog, mnemonics, Logger);
WitsmlLogData sourceLogData = await logDataReader.GetNextBatch();
var chunkMaxSize = await GetMaxBatchSize(mnemonics.Count, CommonConstants.WitsmlFunctionType.WMLSUpdateInStore, CommonConstants.WitsmlQueryTypeName.Log);

while (sourceLogData != null)
{
var mnemonicList = targetLog.IndexCurve.Value + sourceLogData.MnemonicList[sourceLogData.MnemonicList.IndexOf(CommonConstants.DataSeparator, StringComparison.InvariantCulture)..];
var updateLogDataQueries = LogWorkerTools.GetUpdateLogDataQueries(targetLog.Uid, targetLog.UidWell, targetLog.UidWellbore, sourceLogData, chunkMaxSize, mnemonicList);
if (cancellationToken is { IsCancellationRequested: true })
{
return new CopyResult { Success = false, NumberOfRowsCopied = numberOfDataRowsCopied, ErrorReason = CancellationReason() };
}
WitsmlLogs copyNewCurvesQuery = CreateCopyQuery(targetLog, sourceLogData);
QueryResult result = await RequestUtils.WithRetry(async () => await GetTargetWitsmlClientOrThrow().UpdateInStoreAsync(copyNewCurvesQuery), Logger);
if (result.IsSuccessful)
{
numberOfDataRowsCopied += sourceLogData.Data.Count;
UpdateJobProgress(job, sourceLog, sourceLogData);
}
else
foreach (var query in updateLogDataQueries)
{
Logger.LogError("Failed to copy log data. - {Description} - Current index: {StartIndex}", job.Description(), logDataReader.StartIndex);
return new CopyResult { Success = false, NumberOfRowsCopied = numberOfDataRowsCopied, ErrorReason = result.Reason };
}

var result = await RequestUtils.WithRetry(async () => await GetTargetWitsmlClientOrThrow().UpdateInStoreAsync(query), Logger);
if (!result.IsSuccessful)
{
Logger.LogError("Failed to copy log data. - {Description} - Current index: {StartIndex}", job.Description(), logDataReader.StartIndex);
return new CopyResult { Success = false, NumberOfRowsCopied = numberOfDataRowsCopied, ErrorReason = result.Reason };
}
}
numberOfDataRowsCopied += sourceLogData.Data.Count;
UpdateJobProgress(job, sourceLog, sourceLogData);
sourceLogData = await logDataReader.GetNextBatch();
}

Expand Down
46 changes: 18 additions & 28 deletions Src/WitsmlExplorer.Api/Workers/ImportLogDataWorker.cs
eliasbruvik marked this conversation as resolved.
Show resolved Hide resolved
eliasbruvik marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,24 @@ public ImportLogDataWorker(ILogger<ImportLogDataJob> logger, IWitsmlClientProvid
}
}

//Todo: find a way to determine the maximum amount of rows that can be sent to the WITSML server then pass that amount to the CreateImportQueries method
WitsmlLogs[] queries = CreateImportQueries(job, chunkSize).ToArray();
var dataRows = job.DataRows
.Where(d => d.Count() > 1)
.Select(row => new WitsmlData
{
Data = string.Join(CommonConstants.DataSeparator, row)
});

var logData = new WitsmlLogData()
{
Data = dataRows.ToList(),
UnitList = string.Join(CommonConstants.DataSeparator, job.Units)
};
var mnemonicList =
string.Join(CommonConstants.DataSeparator, job.Mnemonics);

var chunkMaxSize = await GetMaxBatchSize(job.Mnemonics.Count, CommonConstants.WitsmlFunctionType.WMLSUpdateInStore, CommonConstants.WitsmlQueryTypeName.Log);

var queries = LogWorkerTools.GetUpdateLogDataQueries(witsmlLog.Uid, witsmlLog.UidWell, witsmlLog.UidWellbore, logData, chunkMaxSize, mnemonicList).ToArray();

for (int i = 0; i < queries.Length; i++)
{
Expand Down Expand Up @@ -99,32 +115,6 @@ private async Task<WitsmlLog> GetLogHeader(string wellUid, string wellboreUid, s
return result?.Logs.FirstOrDefault();
}

private static IEnumerable<WitsmlLogs> CreateImportQueries(ImportLogDataJob job, int chunkSize)
{
return job.DataRows
.Where(d => d.Count() > 1)
.Select(row => new WitsmlData { Data = string.Join(CommonConstants.DataSeparator, row) })
.Chunk(chunkSize)
.Select(logData => new WitsmlLogs
{
Logs = new List<WitsmlLog>
{
new WitsmlLog
{
Uid = job.TargetLog.Uid,
UidWellbore = job.TargetLog.WellboreUid,
UidWell = job.TargetLog.WellUid,
LogData = new WitsmlLogData
{
Data = logData.ToList(),
MnemonicList = string.Join(CommonConstants.DataSeparator, job.Mnemonics),
UnitList = string.Join(CommonConstants.DataSeparator, job.Units)
}
}
},
});
}

private static WitsmlLogs CreateAddMnemonicsQuery(ImportLogDataJob job, WitsmlLog witsmlLog)
{
return new WitsmlLogs
Expand Down
31 changes: 6 additions & 25 deletions Src/WitsmlExplorer.Api/Workers/OffsetLogCurveWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using Microsoft.Extensions.Logging;

using Witsml;
using Witsml.Data;
using Witsml.Extensions;
using Witsml.ServiceReference;
Expand Down Expand Up @@ -176,7 +177,11 @@ private async Task DeleteLogData(WitsmlLog log, WitsmlLogCurveInfo logCurveInfo,

private async Task UpdateLogData(WitsmlLog log, WitsmlLogCurveInfo logCurveinfo, WitsmlLogData offsetLogData)
{
var queries = GetUpdateLogDataQueries(log, offsetLogData);
var mnemonics = offsetLogData.MnemonicList.Split(CommonConstants.DataSeparator).ToList();
var chunkMaxSize = await GetMaxBatchSize(mnemonics.Count, CommonConstants.WitsmlFunctionType.WMLSUpdateInStore, CommonConstants.WitsmlQueryTypeName.Log);
var mnemonicList = offsetLogData.MnemonicList;

var queries = LogWorkerTools.GetUpdateLogDataQueries(log.Uid, log.UidWell, log.UidWellbore, offsetLogData, chunkMaxSize, mnemonicList);
foreach (var query in queries)
{

Expand All @@ -188,30 +193,6 @@ private async Task UpdateLogData(WitsmlLog log, WitsmlLogCurveInfo logCurveinfo,
}
}

private static List<WitsmlLogs> GetUpdateLogDataQueries(WitsmlLog log, WitsmlLogData offsetLogData)
{
int chunkSize = 5000; // TODO: Base this on maxDataNodes/maxDataPoints once issue #1957 is implemented.
List<WitsmlLogs> batchedQueries = offsetLogData.Data.Chunk(chunkSize).Select(chunk =>
new WitsmlLogs
{
Logs = new WitsmlLog
{
Uid = log.Uid,
UidWell = log.UidWell,
UidWellbore = log.UidWellbore,
LogData = new WitsmlLogData
{
MnemonicList = offsetLogData.MnemonicList,
UnitList = offsetLogData.UnitList,
Data = chunk.ToList(),
}
}.AsItemInList()
}
).ToList();

return batchedQueries;
}

private static WitsmlLogData OffsetLogData(WitsmlLogData logData, double depthOffset, TimeSpan timeOffset, bool isDepthLog)
{
List<WitsmlData> offsetLogData = new();
Expand Down
33 changes: 7 additions & 26 deletions Src/WitsmlExplorer.Api/Workers/SpliceLogsWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -276,35 +276,16 @@ private static List<WitsmlLogCurveInfo> GetNewLogCurveInfo(WitsmlLogs logHeaders

private async Task AddDataToLog(string wellUid, string wellboreUid, string logUid, WitsmlLogData data)
{
var batchSize = 5000; // Use maxDataNodes and maxDataPoints to calculate batchSize when supported by the API.
var dataRows = data.Data;
for (int i = 0; i < dataRows.Count; i += batchSize)
var mnemonics = data.MnemonicList.Split(CommonConstants.DataSeparator).ToList();
var chunkMaxSize = await GetMaxBatchSize(mnemonics.Count, CommonConstants.WitsmlFunctionType.WMLSUpdateInStore, CommonConstants.WitsmlQueryTypeName.Log);
var mnemonicList = data.MnemonicList;
var queries = LogWorkerTools.GetUpdateLogDataQueries(logUid, wellUid, wellboreUid, data, chunkMaxSize, mnemonicList);

foreach (var query in queries)
{
var currentLogData = dataRows.Skip(i).Take(batchSize).ToList();
WitsmlLogs copyNewCurvesQuery = CreateAddLogDataRowsQuery(wellUid, wellboreUid, logUid, data, currentLogData);
QueryResult result = await RequestUtils.WithRetry(async () => await GetTargetWitsmlClientOrThrow().UpdateInStoreAsync(copyNewCurvesQuery), Logger);
QueryResult result = await RequestUtils.WithRetry(async () => await GetTargetWitsmlClientOrThrow().UpdateInStoreAsync(query), Logger);
if (!result.IsSuccessful) throw new ArgumentException($"Could not add log data to the new log. {result.Reason}");
}
}

private static WitsmlLogs CreateAddLogDataRowsQuery(string wellUid, string wellboreUid, string logUid, WitsmlLogData logData, List<WitsmlData> currentLogData)
{
return new()
{
Logs = new List<WitsmlLog> {
new(){
UidWell = wellUid,
UidWellbore = wellboreUid,
Uid = logUid,
LogData = new WitsmlLogData
{
MnemonicList = logData.MnemonicList,
UnitList = logData.UnitList,
Data = currentLogData
}
}
}
};
}
}
}
23 changes: 23 additions & 0 deletions Src/WitsmlExplorer.Api/Workers/Tools/LogWorkerTools.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,28 @@ public static double CalculateProgressBasedOnIndex(WitsmlLog log, WitsmlLogData
return (DateTime.Parse(index) - DateTime.Parse(startIndex)).TotalMilliseconds / (DateTime.Parse(endIndex) - DateTime.Parse(startIndex)).TotalMilliseconds;
}
}

public static List<WitsmlLogs> GetUpdateLogDataQueries(string uid, string uidWell, string uidWellbore, WitsmlLogData logData, int chunkSize, string mnemonicList)
{
List<WitsmlLogs> batchedQueries = logData.Data.Chunk(chunkSize).Select(chunk =>
new WitsmlLogs
{
Logs = new WitsmlLog
{
Uid = uid,
UidWell = uidWell,
UidWellbore = uidWellbore,
LogData = new WitsmlLogData
{
MnemonicList = mnemonicList,
UnitList = logData.UnitList,
Data = chunk.ToList(),
}
}.AsItemInList()
}
).ToList();

return batchedQueries;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public CopyLogDataWorkerTests()
_witsmlClient = new Mock<IWitsmlClient>();
witsmlClientProvider.Setup(provider => provider.GetClient()).Returns(_witsmlClient.Object);
witsmlClientProvider.Setup(provider => provider.GetSourceClient()).Returns(_witsmlClient.Object);
LogUtils.SetUpGetServerCapabilites(_witsmlClient);

Mock<ILogger<CopyLogDataJob>> logger = new();
Mock<IDocumentRepository<Server, Guid>> documentRepository = new();
documentRepository.Setup(client => client.GetDocumentsAsync())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public ImportLogDataWorkerTests()
witsmlClientProvider.Setup(provider => provider.GetClient()).Returns(_witsmlClient.Object);
ILoggerFactory loggerFactory = new LoggerFactory();
loggerFactory.AddSerilog(Log.Logger);
LogUtils.SetUpGetServerCapabilites(_witsmlClient);
ILogger<ImportLogDataJob> logger = loggerFactory.CreateLogger<ImportLogDataJob>();
_worker = new ImportLogDataWorker(logger, witsmlClientProvider.Object);
}
Expand Down
31 changes: 31 additions & 0 deletions Tests/WitsmlExplorer.Api.Tests/Workers/LogUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -413,5 +413,36 @@ public static void SetupGetDepthIndexed(Mock<IWitsmlClient> witsmlClient, Func<W
}.AsItemInList()
});
}

public static void SetUpGetServerCapabilites(Mock<IWitsmlClient> witsmlClient)
{
var serverCapabalities = new WitsmlCapServers()
{
ServerCapabilities = new List<WitsmlServerCapabilities>()
{
new WitsmlServerCapabilities()
{
Functions = new List<WitsmlFunction>()
{
new WitsmlFunction()
{
DataObjects = new List<WitsmlFunctionDataObject>()
{
new WitsmlFunctionDataObject()
{
MaxDataNodes = 10000,
MaxDataPoints = 8000000,
Name = "log"
}
},
Name = "WMLS_UpdateInStore"
}
}
}
}

};
witsmlClient.Setup(client => client.GetCap()).ReturnsAsync(serverCapabalities);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public OffsetLogCurveWorkerTests()
witsmlClientProvider.Setup(provider => provider.GetClient()).Returns(_witsmlClient.Object);
ILoggerFactory loggerFactory = new LoggerFactory();
loggerFactory.AddSerilog(Log.Logger);
LogUtils.SetUpGetServerCapabilites(_witsmlClient);
ILogger<OffsetLogCurveJob> logger = loggerFactory.CreateLogger<OffsetLogCurveJob>();
_worker = new OffsetLogCurveWorker(logger, witsmlClientProvider.Object, null, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public SpliceLogsWorkerTests()
_logger = new Mock<ILogger<SpliceLogsJob>>();
_witsmlClientProvider.Setup(provider => provider.GetClient()).Returns(_witsmlClient.Object);
_worker = new SpliceLogsWorker(_logger.Object, _witsmlClientProvider.Object);
LogUtils.SetUpGetServerCapabilites(_witsmlClient);
}

[Theory]
Expand Down