Skip to content

Commit

Permalink
Merge pull request #88 from eedalong/lz/1.0
Browse files Browse the repository at this point in the history
支持1.0版本的IoTDB客户端
  • Loading branch information
lausannel committed Jan 16, 2023
2 parents 937f263 + 57f7016 commit 604e0d6
Show file tree
Hide file tree
Showing 55 changed files with 12,070 additions and 2,788 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:

- name: Set Docker & Run Test
run: |
docker-compose -f docker-compose.yml up --build --abort-on-container-exit --remove-orphans
docker network create --subnet 172.18.0.0/24 iotdb-network && docker-compose -f docker-compose.yml up --build --abort-on-container-exit --remove-orphans
- name: Clean IoTDB & Shut Down Docker
run: |
Expand Down
40 changes: 33 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ services:
context: .
dockerfile: samples/Apache.IoTDB.Samples/Dockerfile
networks:
- iotdb-network
iotdb-network:
ipv4_address: 172.18.0.2

iotdb:
image: apache/iotdb:0.13.0-node
image: apache/iotdb:1.0.0-datanode
restart: always
container_name: iotdb
container_name: iotdb-dn-1
depends_on:
iotdb-confignode-1:
condition: service_healthy
healthcheck:
test: ["CMD", "ls", "/iotdb/data"]
interval: 3s
Expand All @@ -27,9 +31,31 @@ services:
ports:
- 6667:6667
networks:
- iotdb-network
iotdb-network:
ipv4_address: 172.18.0.3
environment:
- dn_rpc_address=iotdb
- dn_internal_address=iotdb
- dn_target_config_node_list=iotdb-confignode-1:22277

iotdb-confignode-1:
image: apache/iotdb:1.0.0-confignode
restart: always
container_name: iotdb-cn-1
healthcheck:
test: ["CMD", "ls", "/iotdb/data"]
interval: 3s
timeout: 5s
retries: 30
start_period: 30s
networks:
iotdb-network:
ipv4_address: 172.18.0.4
environment:
- cn_internal_address=iotdb-confignode-1
- cn_target_config_node_list=iotdb-confignode-1:22277

networks:
iotdb-network:
driver: bridge

networks:
iotdb-network:
external: true
201 changes: 201 additions & 0 deletions samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedRecord.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,52 @@ public async Task TestInsertAlignedRecord()
await session_pool.Close();
Console.WriteLine("TestInsertAlignedRecordAsync Passed");
}
public async Task TestInsertAlignedStringRecord()
{
var session_pool = new SessionPool(host, port, pool_size);
var status = 0;
await session_pool.Open(false);
if (debug) session_pool.OpenDebugMode();

System.Diagnostics.Debug.Assert(session_pool.IsOpen());
await session_pool.DeleteStorageGroupAsync(test_group_name);

status = await session_pool.CreateAlignedTimeseriesAsync(
string.Format("{0}.{1}", test_group_name, test_device),
new List<string>() { test_measurements[0], test_measurements[1], test_measurements[2] },
new List<TSDataType>() { TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT },
new List<TSEncoding>() { TSEncoding.PLAIN, TSEncoding.PLAIN, TSEncoding.PLAIN },
new List<Compressor>() { Compressor.UNCOMPRESSED, Compressor.UNCOMPRESSED, Compressor.UNCOMPRESSED });

System.Diagnostics.Debug.Assert(status == 0);
var measurements = new List<string>
{test_measurements[0], test_measurements[1], test_measurements[2]};
var values = new List<string> { "test_text1", "test_text2", "test_text3" };
var tasks = new List<Task<int>>();
var start_ms = DateTime.Now.Ticks / 10000;
for (var timestamp = 1; timestamp <= fetch_size * processed_size; timestamp++)
{
var task = session_pool.InsertAlignedStringRecordAsync(
string.Format("{0}.{1}", test_group_name, test_device), measurements, values, timestamp);
tasks.Add(task);
}

Task.WaitAll(tasks.ToArray());
var end_ms = DateTime.Now.Ticks / 10000;
Console.WriteLine(string.Format("total insert aligned string record time is {0}", end_ms - start_ms));
var res = await session_pool.ExecuteQueryStatementAsync("select * from " + string.Format("{0}.{1}", test_group_name, test_device));
var res_cnt = 0;
while (res.HasNext())
{
res.Next();
res_cnt++;
}
Console.WriteLine(res_cnt + " " + fetch_size * processed_size);
System.Diagnostics.Debug.Assert(res_cnt == fetch_size * processed_size);
await session_pool.DeleteStorageGroupAsync(test_group_name);
await session_pool.Close();
Console.WriteLine("TestInsertAlignedStringRecordAsync Passed");
}
public async Task TestInsertAlignedRecords()
{
var session_pool = new SessionPool(host, port, pool_size);
Expand Down Expand Up @@ -165,6 +211,87 @@ public async Task TestInsertAlignedRecords()
await session_pool.Close();
Console.WriteLine("TestInsertAlignedRecords Passed!");
}
public async Task TestInsertAlignedStringRecords()
{
var session_pool = new SessionPool(host, port, pool_size);
await session_pool.Open(false);
if (debug) session_pool.OpenDebugMode();

System.Diagnostics.Debug.Assert(session_pool.IsOpen());
var status = 0;
await session_pool.DeleteStorageGroupAsync(test_group_name);

string prefixPath = string.Format("{0}.{1}", test_group_name, test_device);
var measurement_lst = new List<string>() { test_measurements[1], test_measurements[2] };
var data_type_lst = new List<TSDataType>() { TSDataType.TEXT, TSDataType.TEXT };
var encoding_lst = new List<TSEncoding>() { TSEncoding.PLAIN, TSEncoding.PLAIN };
var compressor_lst = new List<Compressor>() { Compressor.SNAPPY, Compressor.SNAPPY };
status = await session_pool.CreateAlignedTimeseriesAsync(prefixPath, measurement_lst, data_type_lst, encoding_lst,
compressor_lst);
System.Diagnostics.Debug.Assert(status == 0);

var device_id = new List<string>() { };
for (var i = 0; i < 3; i++) device_id.Add(string.Format("{0}.{1}", test_group_name, test_device));
var measurements_lst = new List<List<string>>() { };
measurements_lst.Add(new List<string>() { test_measurements[1], test_measurements[2] });
measurements_lst.Add(new List<string>() { test_measurements[1], test_measurements[2] });
measurements_lst.Add(new List<string>() { test_measurements[1], test_measurements[2] });
var values_lst = new List<List<string>>() { };
values_lst.Add(new List<string>() { "test1", "test2" });
values_lst.Add(new List<string>() { "test3", "test4" });
values_lst.Add(new List<string>() { "test5", "test6" });
List<long> timestamp_lst = new List<long>() { 1, 2, 3 };

status = await session_pool.InsertAlignedStringRecordsAsync(device_id, measurements_lst, values_lst, timestamp_lst);
System.Diagnostics.Debug.Assert(status == 0);
var res = await session_pool.ExecuteQueryStatementAsync(
"select * from " + string.Format("{0}.{1}", test_group_name, test_device) + " where time<10");
res.ShowTableNames();
while (res.HasNext()) Console.WriteLine(res.Next());

await res.Close();

// large data test
device_id = new List<string>() { };
measurements_lst = new List<List<string>>() { };
values_lst = new List<List<string>>() { };
timestamp_lst = new List<long>() { };
List<Task<int>> tasks = new List<Task<int>>();
for (var timestamp = 4; timestamp <= fetch_size * processed_size; timestamp++)
{
device_id.Add(string.Format("{0}.{1}", test_group_name, test_device));
measurements_lst.Add(new List<string>() { test_measurements[1], test_measurements[2] });
values_lst.Add(new List<string>() { "test1", "test2" });
timestamp_lst.Add(timestamp);
if (timestamp % fetch_size == 0)
{
tasks.Add(session_pool.InsertAlignedStringRecordsAsync(device_id, measurements_lst, values_lst, timestamp_lst));
device_id = new List<string>() { };
measurements_lst = new List<List<string>>() { };
values_lst = new List<List<string>>() { };
timestamp_lst = new List<long>() { };
}
}

Task.WaitAll(tasks.ToArray());
res = await session_pool.ExecuteQueryStatementAsync(
"select * from " + string.Format("{0}.{1}", test_group_name, test_device));
res.ShowTableNames();
var res_count = 0;
while (res.HasNext())
{
res.Next();
res_count += 1;
}

await res.Close();
Console.WriteLine(res_count + " " + fetch_size * processed_size);
System.Diagnostics.Debug.Assert(res_count == fetch_size * processed_size);
status = await session_pool.DeleteStorageGroupAsync(test_group_name);
System.Diagnostics.Debug.Assert(status == 0);
await session_pool.Close();
Console.WriteLine("TestInsertAlignedStringRecords Passed!");
}
public async Task TestInsertAlignedRecordsOfOneDevice()
{
var session_pool = new SessionPool(host, port, pool_size);
Expand Down Expand Up @@ -275,5 +402,79 @@ public async Task TestInsertAlignedRecordsOfOneDevice()
await session_pool.Close();
Console.WriteLine("TestInsertAlignedRecordsOfOneDevice Passed!");
}
public async Task TestInsertAlignedStringRecordsOfOneDevice()
{
var session_pool = new SessionPool(host, port, pool_size);
await session_pool.Open(false);
if (debug) session_pool.OpenDebugMode();

System.Diagnostics.Debug.Assert(session_pool.IsOpen());
var status = 0;
await session_pool.DeleteStorageGroupAsync(test_group_name);
var device_id = string.Format("{0}.{1}", test_group_name, test_device);
var measurements = new List<string>() { test_measurements[0], test_measurements[1], test_measurements[2] };
var data_type_lst = new List<TSDataType>() { TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT };
var encoding_lst = new List<TSEncoding>() { TSEncoding.PLAIN, TSEncoding.PLAIN, TSEncoding.PLAIN };
var compressor_lst = new List<Compressor>() { Compressor.SNAPPY, Compressor.SNAPPY, Compressor.SNAPPY };
status = await session_pool.CreateAlignedTimeseriesAsync(device_id, measurements, data_type_lst, encoding_lst, compressor_lst);
System.Diagnostics.Debug.Assert(status == 0);

var measurements_lst = new List<List<string>>() { };
measurements_lst.Add(new List<string>() { test_measurements[0], test_measurements[1], test_measurements[2] });
measurements_lst.Add(new List<string>() { test_measurements[0], test_measurements[1], test_measurements[2] });
measurements_lst.Add(new List<string>() { test_measurements[0], test_measurements[1], test_measurements[2] });

var values_lst = new List<List<string>>() { };
values_lst.Add(new List<string>() { "test1", "test2", "test3" });
values_lst.Add(new List<string>() { "test4", "test5", "test6" });
values_lst.Add(new List<string>() { "test7", "test8", "test9" });

var timestamp_lst = new List<long>() { 1, 2, 3 };

status = await session_pool.InsertAlignedStringRecordsOfOneDeviceAsync(device_id, timestamp_lst, measurements_lst, values_lst);
System.Diagnostics.Debug.Assert(status == 0);
var res = await session_pool.ExecuteQueryStatementAsync(
"select * from " + string.Format("{0}.{1}", test_group_name, test_device) + " where time<10");
res.ShowTableNames();
while (res.HasNext()) Console.WriteLine(res.Next());

await res.Close();
// large data test
values_lst = new List<List<string>>() { };
var tasks = new List<Task<int>>();
measurements_lst = new List<List<string>>() { };
timestamp_lst = new List<long>() { };
for (var timestamp = 4; timestamp <= fetch_size * processed_size; timestamp++)
{
values_lst.Add(new List<string>() { "test1", "test2" });
measurements_lst.Add(new List<string>() { test_measurements[1], test_measurements[2] });
timestamp_lst.Add(timestamp);
if (timestamp % fetch_size == 0)
{
tasks.Add(session_pool.InsertAlignedStringRecordsOfOneDeviceAsync(device_id, timestamp_lst, measurements_lst, values_lst));
values_lst = new List<List<string>>() { };
measurements_lst = new List<List<string>>() { };
timestamp_lst = new List<long>() { };
}
}

Task.WaitAll(tasks.ToArray());
res = await session_pool.ExecuteQueryStatementAsync(
"select * from " + string.Format("{0}.{1}", test_group_name, test_device));
var res_count = 0;
while (res.HasNext())
{
res.Next();
res_count += 1;
}

await res.Close();
Console.WriteLine(res_count + " " + fetch_size * processed_size);
System.Diagnostics.Debug.Assert(res_count == fetch_size * processed_size);
status = await session_pool.DeleteStorageGroupAsync(test_group_name);
System.Diagnostics.Debug.Assert(status == 0);
await session_pool.Close();
Console.WriteLine("TestInsertAlignedStringRecordsOfOneDevice Passed!");
}
}
}
Loading

0 comments on commit 604e0d6

Please sign in to comment.