diff --git a/src/Apache.IoTDB/SessionPool.cs b/src/Apache.IoTDB/SessionPool.cs index 43b561f..211ad82 100644 --- a/src/Apache.IoTDB/SessionPool.cs +++ b/src/Apache.IoTDB/SessionPool.cs @@ -22,6 +22,7 @@ public class SessionPool private readonly string _username; private readonly string _password; + private bool _enableRpcCompression; private string _zoneId; private readonly string _host; private readonly int _port; @@ -122,6 +123,7 @@ public void CloseDebugMode() public async Task Open(bool enableRpcCompression) { _clients = new ConcurrentClientQueue(); + _enableRpcCompression = enableRpcCompression; for (var index = 0; index < _poolSize; index++) { @@ -278,7 +280,22 @@ public async Task SetStorageGroup(string groupName) } catch (TException e) { - throw new TException($"set storage group {groupName} failed", e); + // try to reconnect + await Open(_enableRpcCompression); + client = _clients.Take(); + try + { + var status = await client.ServiceClient.setStorageGroupAsync(client.SessionId, groupName); + if (_debugMode) + { + _logger.Info("set storage group {0} successfully, server message is {1}", groupName, status.Message); + } + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when setting storage group", ex); + } } finally { @@ -312,7 +329,22 @@ public async Task CreateTimeSeries( } catch (TException e) { - throw new TException($"create time series {tsPath} failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.createTimeseriesAsync(req); + if (_debugMode) + { + _logger.Info("creating time series {0} successfully, server message is {1}", tsPath, status.Message); + } + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when creating time series", ex); + } } finally { @@ -353,7 +385,22 @@ public async Task CreateAlignedTimeseriesAsync( } catch (TException e) { - throw new TException($"create aligned time series {prefixPath} failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.createAlignedTimeseriesAsync(req); + if (_debugMode) + { + _logger.Info("creating aligned time series {0} successfully, server message is {1}", prefixPath, status.Message); + } + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when creating aligned time series", ex); + } } finally { @@ -379,7 +426,25 @@ public async Task DeleteStorageGroupAsync(string groupName) } catch (TException e) { - throw new TException($"delete storage group {groupName} failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + try + { + var status = await client.ServiceClient.deleteStorageGroupsAsync( + client.SessionId, + new List { groupName }); + + if (_debugMode) + { + _logger.Info($"delete storage group {groupName} successfully, server message is {status?.Message}"); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when deleting storage group", ex); + } } finally { @@ -407,7 +472,26 @@ public async Task DeleteStorageGroupsAsync(List groupNames) } catch (TException e) { - throw new TException($"delete storage group(s) {groupNames} failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + try + { + var status = await client.ServiceClient.deleteStorageGroupsAsync(client.SessionId, groupNames); + + if (_debugMode) + { + _logger.Info( + "delete storage group(s) {0} successfully, server message is {1}", + groupNames, + status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when deleting storage group(s)", ex); + } } finally { @@ -441,7 +525,22 @@ public async Task CreateMultiTimeSeriesAsync( } catch (TException e) { - throw new TException($"create multiple time series {tsPathLst} failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.createMultiTimeseriesAsync(req); + if (_debugMode) + { + _logger.Info("creating multiple time series {0}, server message is {1}", tsPathLst, status.Message); + } + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when creating multiple time series", ex); + } } finally { @@ -466,7 +565,23 @@ public async Task DeleteTimeSeriesAsync(List pathList) } catch (TException e) { - throw new TException($"delete time series {pathList} failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + try + { + var status = await client.ServiceClient.deleteTimeseriesAsync(client.SessionId, pathList); + + if (_debugMode) + { + _logger.Info("deleting multiple time series {0}, server message is {1}", pathList, status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when deleting multiple time series", ex); + } } finally { @@ -515,7 +630,27 @@ public async Task DeleteDataAsync(List tsPathLst, long startTime, l } catch (TException e) { - throw new TException("data deletion fails because", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.deleteDataAsync(req); + + if (_debugMode) + { + _logger.Info( + "delete data from {0}, server message is {1}", + tsPathLst, + status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when deleting data", ex); + } } finally { @@ -542,7 +677,24 @@ public async Task InsertRecordAsync(string deviceId, RowRecord record) } catch (TException e) { - throw new TException("Record insertion failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.insertRecordAsync(req); + + if (_debugMode) + { + _logger.Info("insert one record to device {0}, server message: {1}", deviceId, status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when inserting record", ex); + } } finally { @@ -570,7 +722,24 @@ public async Task InsertAlignedRecordAsync(string deviceId, RowRecord recor } catch (TException e) { - throw new TException("Record insertion failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.insertRecordAsync(req); + + if (_debugMode) + { + _logger.Info("insert one record to device {0}, server message: {1}", deviceId, status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when inserting record", ex); + } } finally { @@ -619,7 +788,24 @@ public async Task InsertRecordsAsync(List deviceId, List } catch (TException e) { - throw new TException("Multiple records insertion failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + request.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.insertRecordsAsync(request); + + if (_debugMode) + { + _logger.Info("insert multiple records to devices {0}, server message: {1}", deviceId, status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when inserting records", ex); + } } finally { @@ -648,7 +834,24 @@ public async Task InsertAlignedRecordsAsync(List deviceId, List InsertTabletAsync(Tablet tablet) } catch (TException e) { - throw new TException("Tablet insertion failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.insertTabletAsync(req); + + if (_debugMode) + { + _logger.Info("insert one tablet to device {0}, server message: {1}", tablet.DeviceId, status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when inserting tablet", ex); + } } finally { @@ -711,7 +931,24 @@ public async Task InsertAlignedTabletAsync(Tablet tablet) } catch (TException e) { - throw new TException("Aligned tablet insertion failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.insertTabletAsync(req); + + if (_debugMode) + { + _logger.Info("insert one aligned tablet to device {0}, server message: {1}", tablet.DeviceId, status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when inserting tablet", ex); + } } finally { @@ -768,9 +1005,24 @@ public async Task InsertTabletsAsync(List tabletLst) } catch (TException e) { - _clients.Add(client); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.insertTabletsAsync(req); - throw new TException("Multiple tablets insertion failed", e); + if (_debugMode) + { + _logger.Info("insert multiple tablets, message: {0}", status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when inserting tablets", ex); + } } finally { @@ -797,9 +1049,24 @@ public async Task InsertAlignedTabletsAsync(List tabletLst) } catch (TException e) { - _clients.Add(client); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.insertTabletsAsync(req); - throw new TException("Multiple aligned tablets insertion failed", e); + if (_debugMode) + { + _logger.Info("insert multiple aligned tablets, message: {0}", status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when inserting tablets", ex); + } } finally { @@ -861,7 +1128,24 @@ public async Task InsertRecordsOfOneDeviceSortedAsync(string deviceId, List } catch (TException e) { - throw new TException("Sorted records of one device insertion failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.insertRecordsOfOneDeviceAsync(req); + + if (_debugMode) + { + _logger.Info("insert records of one device, message: {0}", status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when inserting records of one device", ex); + } } finally { @@ -895,7 +1179,24 @@ public async Task InsertAlignedRecordsOfOneDeviceSortedAsync(string deviceI } catch (TException e) { - throw new TException("Sorted aligned records of one device insertion failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.insertRecordsOfOneDeviceAsync(req); + + if (_debugMode) + { + _logger.Info("insert aligned records of one device, message: {0}", status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when inserting aligned records of one device", ex); + } } finally { @@ -927,7 +1228,24 @@ public async Task TestInsertRecordAsync(string deviceId, RowRecord record) } catch (TException e) { - throw new TException("Record insertion failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.testInsertRecordAsync(req); + + if (_debugMode) + { + _logger.Info("insert one record to device {0}, server message: {1}", deviceId, status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when test inserting one record", ex); + } } finally { @@ -953,7 +1271,24 @@ public async Task TestInsertRecordsAsync(List deviceId, List TestInsertTabletAsync(Tablet tablet) } catch (TException e) { - throw new TException("Tablet insertion failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.testInsertTabletAsync(req); + + if (_debugMode) + { + _logger.Info("insert one tablet to device {0}, server message: {1}", tablet.DeviceId, + status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when test inserting one tablet", ex); + } } finally { @@ -1008,7 +1361,24 @@ public async Task TestInsertTabletsAsync(List tabletLst) } catch (TException e) { - throw new TException("Multiple tablets insertion failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.testInsertTabletsAsync(req); + + if (_debugMode) + { + _logger.Info("insert multiple tablets, message: {0}", status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when test inserting multiple tablets", ex); + } } finally { @@ -1034,7 +1404,20 @@ public async Task ExecuteQueryStatementAsync(string sql) { _clients.Add(client); - throw new TException("could not execute query statement", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + req.StatementId = client.StatementId; + try + { + resp = await client.ServiceClient.executeQueryStatementAsync(req); + status = resp.Status; + } + catch (TException ex) + { + _clients.Add(client); + throw new TException("Error occurs when executing query statement", ex); + } } if (_utilFunctions.VerifySuccess(status, SuccessCode) == -1) @@ -1073,7 +1456,26 @@ public async Task ExecuteNonQueryStatementAsync(string sql) } catch (TException e) { - throw new TException("execution of non-query statement failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + req.StatementId = client.StatementId; + try + { + var resp = await client.ServiceClient.executeUpdateStatementAsync(req); + var status = resp.Status; + + if (_debugMode) + { + _logger.Info("execute non-query statement {0} message: {1}", sql, status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when executing non-query statement", ex); + } } finally { @@ -1098,7 +1500,24 @@ public async Task CreateSchemaTemplateAsync(Template template) } catch (TException e) { - throw new TException("schema template creation failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.createSchemaTemplateAsync(req); + + if (_debugMode) + { + _logger.Info("create schema template {0} message: {1}", template.Name, status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when creating schema template", ex); + } } finally { @@ -1123,7 +1542,24 @@ public async Task DropSchemaTemplateAsync(string templateName) } catch (TException e) { - throw new TException("schema template drop failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.dropSchemaTemplateAsync(req); + + if (_debugMode) + { + _logger.Info("drop schema template {0} message: {1}", templateName, status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when dropping schema template", ex); + } } finally { @@ -1148,7 +1584,24 @@ public async Task SetSchemaTemplateAsync(string templateName, string prefix } catch (TException e) { - throw new TException("schema template setting failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.setSchemaTemplateAsync(req); + + if (_debugMode) + { + _logger.Info("set schema template {0} message: {1}", templateName, status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when setting schema template", ex); + } } finally { @@ -1173,7 +1626,24 @@ public async Task UnsetSchemaTemplateAsync(string prefixPath, string templa } catch (TException e) { - throw new TException("schema template unsetting failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.unsetSchemaTemplateAsync(req); + + if (_debugMode) + { + _logger.Info("unset schema template {0} message: {1}", templateName, status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when unsetting schema template", ex); + } } finally { @@ -1201,7 +1671,24 @@ public async Task AddAlignedMeasurementsInTemplateAsync(string templateName } catch (TException e) { - throw new TException("add aligned measurements in template failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.appendSchemaTemplateAsync(req); + + if (_debugMode) + { + _logger.Info("add aligned measurements in template {0} message: {1}", templateName, status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when adding aligned measurements in template", ex); + } } finally { @@ -1230,7 +1717,24 @@ public async Task AddUnalignedMeasurementsInTemplateAsync(string templateNa } catch (TException e) { - throw new TException("add unaligned measurements in template failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.appendSchemaTemplateAsync(req); + + if (_debugMode) + { + _logger.Info("add unaligned measurements in template {0} message: {1}", templateName, status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when adding unaligned measurements in template", ex); + } } finally { @@ -1254,7 +1758,24 @@ public async Task DeleteNodeInTemplateAsync(string templateName, string pat } catch (TException e) { - throw new TException("delete node in template failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var status = await client.ServiceClient.pruneSchemaTemplateAsync(req); + + if (_debugMode) + { + _logger.Info("delete node in template {0} message: {1}", templateName, status.Message); + } + + return _utilFunctions.VerifySuccess(status, SuccessCode); + } + catch (TException ex) + { + throw new TException("Error occurs when deleting node in template", ex); + } } finally { @@ -1280,7 +1801,25 @@ public async Task CountMeasurementsInTemplateAsync(string name) } catch (TException e) { - throw new TException("count measurements in template failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var resp = await client.ServiceClient.querySchemaTemplateAsync(req); + var status = resp.Status; + if (_debugMode) + { + _logger.Info("count measurements in template {0} message: {1}", name, status.Message); + } + + _utilFunctions.VerifySuccess(status, SuccessCode); + return resp.Count; + } + catch (TException ex) + { + throw new TException("Error occurs when counting measurements in template", ex); + } } finally { @@ -1306,7 +1845,25 @@ public async Task IsMeasurementInTemplateAsync(string templateName, string } catch (TException e) { - throw new TException("is measurement in template failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var resp = await client.ServiceClient.querySchemaTemplateAsync(req); + var status = resp.Status; + if (_debugMode) + { + _logger.Info("is measurement in template {0} message: {1}", templateName, status.Message); + } + + _utilFunctions.VerifySuccess(status, SuccessCode); + return resp.Result; + } + catch (TException ex) + { + throw new TException("Error occurs when checking measurement in template", ex); + } } finally { @@ -1333,7 +1890,25 @@ public async Task IsPathExistInTemplate(string templateName, string path) } catch (TException e) { - throw new TException("is path exist in template failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var resp = await client.ServiceClient.querySchemaTemplateAsync(req); + var status = resp.Status; + if (_debugMode) + { + _logger.Info("is path exist in template {0} message: {1}", templateName, status.Message); + } + + _utilFunctions.VerifySuccess(status, SuccessCode); + return resp.Result; + } + catch (TException ex) + { + throw new TException("Error occurs when checking path exist in template", ex); + } } finally { @@ -1360,7 +1935,25 @@ public async Task> ShowMeasurementsInTemplateAsync(string templateN } catch (TException e) { - throw new TException("get measurements in template failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var resp = await client.ServiceClient.querySchemaTemplateAsync(req); + var status = resp.Status; + if (_debugMode) + { + _logger.Info("get measurements in template {0} message: {1}", templateName, status.Message); + } + + _utilFunctions.VerifySuccess(status, SuccessCode); + return resp.Measurements; + } + catch (TException ex) + { + throw new TException("Error occurs when getting measurements in template", ex); + } } finally { @@ -1385,7 +1978,25 @@ public async Task> ShowAllTemplatesAsync() } catch (TException e) { - throw new TException("get all templates failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var resp = await client.ServiceClient.querySchemaTemplateAsync(req); + var status = resp.Status; + if (_debugMode) + { + _logger.Info("get all templates message: {0}", status.Message); + } + + _utilFunctions.VerifySuccess(status, SuccessCode); + return resp.Measurements; + } + catch (TException ex) + { + throw new TException("Error occurs when getting all templates", ex); + } } finally { @@ -1410,7 +2021,25 @@ public async Task> ShowPathsTemplateSetOnAsync(string templateName) } catch (TException e) { - throw new TException("get paths template set on failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var resp = await client.ServiceClient.querySchemaTemplateAsync(req); + var status = resp.Status; + if (_debugMode) + { + _logger.Info("get paths template set on {0} message: {1}", templateName, status.Message); + } + + _utilFunctions.VerifySuccess(status, SuccessCode); + return resp.Measurements; + } + catch (TException ex) + { + throw new TException("Error occurs when getting paths template set on", ex); + } } finally { @@ -1435,7 +2064,25 @@ public async Task> ShowPathsTemplateUsingOnAsync(string templateNam } catch (TException e) { - throw new TException("get paths template using on failed", e); + await Open(_enableRpcCompression); + client = _clients.Take(); + req.SessionId = client.SessionId; + try + { + var resp = await client.ServiceClient.querySchemaTemplateAsync(req); + var status = resp.Status; + if (_debugMode) + { + _logger.Info("get paths template using on {0} message: {1}", templateName, status.Message); + } + + _utilFunctions.VerifySuccess(status, SuccessCode); + return resp.Measurements; + } + catch (TException ex) + { + throw new TException("Error occurs when getting paths template using on", ex); + } } finally {