From 5ae3092e91edeee944336c6c8d3d2025b2c52966 Mon Sep 17 00:00:00 2001 From: neptunian Date: Wed, 7 Oct 2020 13:41:16 -0400 Subject: [PATCH 1/5] build datastream name from index name --- .../epm/elasticsearch/template/template.ts | 20 ++----------------- 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/template/template.ts b/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/template/template.ts index e0fea59107c267..561108f96869d7 100644 --- a/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/template/template.ts +++ b/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/template/template.ts @@ -400,13 +400,8 @@ const updateExistingIndex = async ({ delete mappings.properties.stream; delete mappings.properties.data_stream; - // get the data_stream values from the index template to compose data stream name - const indexMappings = await getIndexMappings(indexName, callCluster); - const dataStream = indexMappings[indexName].mappings.properties.data_stream.properties; - if (!dataStream.type.value || !dataStream.dataset.value || !dataStream.namespace.value) - throw new Error(`data_stream values are missing from the index template ${indexName}`); - const dataStreamName = `${dataStream.type.value}-${dataStream.dataset.value}-${dataStream.namespace.value}`; - + const [, dsType, dsTemplateName, dsNamespace] = indexName.split('-'); + const dataStreamName = `${dsType}-${dsTemplateName}-${dsNamespace}`; // try to update the mappings first try { await callCluster('indices.putMapping', { @@ -438,14 +433,3 @@ const updateExistingIndex = async ({ throw new Error(`could not update index template settings for ${indexName}`); } }; - -const getIndexMappings = async (indexName: string, callCluster: CallESAsCurrentUser) => { - try { - const indexMappings = await callCluster('indices.getMapping', { - index: indexName, - }); - return indexMappings; - } catch (err) { - throw new Error(`could not get mapping from ${indexName}`); - } -}; From edb3cc33ae91d458a440ede00118f813febf46f1 Mon Sep 17 00:00:00 2001 From: neptunian Date: Thu, 8 Oct 2020 14:39:24 -0400 Subject: [PATCH 2/5] query for data_stream constants to create data stream name --- .../ingest_manager/common/types/models/epm.ts | 5 +++ .../epm/elasticsearch/template/template.ts | 45 +++++++++++++++++-- .../epm/kibana/index_pattern/install.ts | 18 ++++---- .../ingest_manager/server/types/index.tsx | 1 + 4 files changed, 58 insertions(+), 11 deletions(-) diff --git a/x-pack/plugins/ingest_manager/common/types/models/epm.ts b/x-pack/plugins/ingest_manager/common/types/models/epm.ts index ea7fd60d1fa3f8..2ec9d7be6c8824 100644 --- a/x-pack/plugins/ingest_manager/common/types/models/epm.ts +++ b/x-pack/plugins/ingest_manager/common/types/models/epm.ts @@ -44,6 +44,11 @@ export enum ElasticsearchAssetType { transform = 'transform', } +export enum DataType { + logs = 'logs', + metrics = 'metrics', +} + export enum AgentAssetType { input = 'input', } diff --git a/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/template/template.ts b/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/template/template.ts index 561108f96869d7..9391474bf4ccb7 100644 --- a/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/template/template.ts +++ b/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/template/template.ts @@ -11,6 +11,7 @@ import { TemplateRef, IndexTemplate, IndexTemplateMappings, + DataType, } from '../../../../types'; import { getRegistryDataStreamAssetBaseName } from '../index'; @@ -400,8 +401,6 @@ const updateExistingIndex = async ({ delete mappings.properties.stream; delete mappings.properties.data_stream; - const [, dsType, dsTemplateName, dsNamespace] = indexName.split('-'); - const dataStreamName = `${dsType}-${dsTemplateName}-${dsNamespace}`; // try to update the mappings first try { await callCluster('indices.putMapping', { @@ -411,13 +410,53 @@ const updateExistingIndex = async ({ // if update fails, rollover data stream } catch (err) { try { + // get the data_stream values to compose datastream name + const searchDataStreamFieldsResponse = await callCluster('search', { + index: indexTemplate.index_patterns[0], + body: { + size: 1, + query: { + bool: { + must: [ + { + exists: { + field: 'data_stream.type', + }, + }, + { + exists: { + field: 'data_stream.dataset', + }, + }, + { + exists: { + field: 'data_stream.namespace', + }, + }, + ], + }, + }, + }, + }); + if (searchDataStreamFieldsResponse.hits.total.value === 0) + throw new Error('data_stream fields are missing from datastream indices'); + const { + dataset, + namespace, + type, + }: { + dataset: string; + namespace: string; + type: DataType; + } = searchDataStreamFieldsResponse.hits.hits[0]._source.data_stream; + const dataStreamName = `${type}-${dataset}-${namespace}`; const path = `/${dataStreamName}/_rollover`; await callCluster('transport.request', { method: 'POST', path, }); } catch (error) { - throw new Error(`cannot rollover data stream ${dataStreamName}`); + throw new Error(`cannot rollover data stream ${error}`); } } // update settings after mappings was successful to ensure diff --git a/x-pack/plugins/ingest_manager/server/services/epm/kibana/index_pattern/install.ts b/x-pack/plugins/ingest_manager/server/services/epm/kibana/index_pattern/install.ts index 2aa28d23cf8575..6238df611642e2 100644 --- a/x-pack/plugins/ingest_manager/server/services/epm/kibana/index_pattern/install.ts +++ b/x-pack/plugins/ingest_manager/server/services/epm/kibana/index_pattern/install.ts @@ -12,7 +12,12 @@ import { import * as Registry from '../../registry'; import { loadFieldsFromYaml, Fields, Field } from '../../fields/field'; import { getPackageKeysByStatus } from '../../packages/get'; -import { InstallationStatus, RegistryPackage, CallESAsCurrentUser } from '../../../../types'; +import { + InstallationStatus, + RegistryPackage, + CallESAsCurrentUser, + DataType, +} from '../../../../types'; import { appContextService } from '../../../../services'; interface FieldFormatMap { @@ -69,10 +74,7 @@ export interface IndexPatternField { lang?: string; readFromDocValues: boolean; } -export enum IndexPatternType { - logs = 'logs', - metrics = 'metrics', -} + // TODO: use a function overload and make pkgName and pkgVersion required for install/update // and not for an update removal. or separate out the functions export async function installIndexPatterns( @@ -112,7 +114,7 @@ export async function installIndexPatterns( const installedPackagesInfo = await Promise.all(installedPackagesFetchInfoPromise); // for each index pattern type, create an index pattern - const indexPatternTypes = [IndexPatternType.logs, IndexPatternType.metrics]; + const indexPatternTypes = [DataType.logs, DataType.metrics]; indexPatternTypes.forEach(async (indexPatternType) => { // if this is an update because a package is being unisntalled (no pkgkey argument passed) and no other packages are installed, remove the index pattern if (!pkgName && installedPackages.length === 0) { @@ -140,7 +142,7 @@ export async function installIndexPatterns( // of all fields from all data streams matching data stream type export const getAllDataStreamFieldsByType = async ( packages: RegistryPackage[], - dataStreamType: IndexPatternType + dataStreamType: DataType ): Promise => { const dataStreamsPromises = packages.reduce>>((acc, pkg) => { if (pkg.data_streams) { @@ -385,7 +387,7 @@ export const ensureDefaultIndices = async (callCluster: CallESAsCurrentUser) => // that no matching indices exist https://github.com/elastic/kibana/issues/62343 const logger = appContextService.getLogger(); return Promise.all( - Object.keys(IndexPatternType).map(async (indexPattern) => { + Object.keys(DataType).map(async (indexPattern) => { const defaultIndexPatternName = indexPattern + INDEX_PATTERN_PLACEHOLDER_SUFFIX; const indexExists = await callCluster('indices.exists', { index: defaultIndexPatternName }); if (!indexExists) { diff --git a/x-pack/plugins/ingest_manager/server/types/index.tsx b/x-pack/plugins/ingest_manager/server/types/index.tsx index 0c070959e3b930..7d841ed024ce51 100644 --- a/x-pack/plugins/ingest_manager/server/types/index.tsx +++ b/x-pack/plugins/ingest_manager/server/types/index.tsx @@ -73,6 +73,7 @@ export { // Agent Request types PostAgentEnrollRequest, PostAgentCheckinRequest, + DataType, } from '../../common'; export type CallESAsCurrentUser = LegacyScopedClusterClient['callAsCurrentUser']; From ca03ef3298d62b48c14db65f47f734c164a5db4e Mon Sep 17 00:00:00 2001 From: neptunian Date: Mon, 12 Oct 2020 12:28:45 -0400 Subject: [PATCH 3/5] simply datastream tests and add a test to upgrade after a datastream rolls over --- .../apis/epm/data_stream.ts | 102 +++++++++--------- 1 file changed, 50 insertions(+), 52 deletions(-) diff --git a/x-pack/test/ingest_manager_api_integration/apis/epm/data_stream.ts b/x-pack/test/ingest_manager_api_integration/apis/epm/data_stream.ts index 5da9b5e3031b24..57d3a642357797 100644 --- a/x-pack/test/ingest_manager_api_integration/apis/epm/data_stream.ts +++ b/x-pack/test/ingest_manager_api_integration/apis/epm/data_stream.ts @@ -24,15 +24,17 @@ export default function (providerContext: FtrProviderContext) { await supertest.delete(`/api/fleet/epm/packages/${pkg}`).set('kbn-xsrf', 'xxxx'); }; const installPackage = async (pkg: string) => { - await supertest + return await supertest .post(`/api/fleet/epm/packages/${pkg}`) .set('kbn-xsrf', 'xxxx') - .send({ force: true }); + .send({ force: true }) + .expect(200); }; describe('datastreams', async () => { skipIfNoDockerRegistry(providerContext); - before(async () => { + skipIfNoDockerRegistry(providerContext); + beforeEach(async () => { await installPackage(pkgKey); await es.transport.request({ method: 'POST', @@ -61,8 +63,7 @@ export default function (providerContext: FtrProviderContext) { }, }); }); - after(async () => { - await uninstallPackage(pkgUpdateKey); + afterEach(async () => { await es.transport.request({ method: 'DELETE', path: `/_data_stream/${logsTemplateName}-default`, @@ -71,60 +72,57 @@ export default function (providerContext: FtrProviderContext) { method: 'DELETE', path: `/_data_stream/${metricsTemplateName}-default`, }); + await uninstallPackage(pkgKey); + await uninstallPackage(pkgUpdateKey); }); - describe('get datastreams after data sent', async () => { - skipIfNoDockerRegistry(providerContext); - let resLogsDatastream: any; - let resMetricsDatastream: any; - before(async () => { - resLogsDatastream = await es.transport.request({ - method: 'GET', - path: `/_data_stream/${logsTemplateName}-default`, - }); - resMetricsDatastream = await es.transport.request({ - method: 'GET', - path: `/_data_stream/${metricsTemplateName}-default`, - }); - }); - it('should list the logs datastream', async function () { - expect(resLogsDatastream.body.data_streams.length).equal(1); - expect(resLogsDatastream.body.data_streams[0].indices.length).equal(1); - expect(resLogsDatastream.body.data_streams[0].indices[0].index_name).equal( - `.ds-${logsTemplateName}-default-000001` - ); + it('should list the logs and metrics datastream', async function () { + const resLogsDatastream = await es.transport.request({ + method: 'GET', + path: `/_data_stream/${logsTemplateName}-default`, }); - it('should list the metrics datastream', async function () { - expect(resMetricsDatastream.body.data_streams.length).equal(1); - expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1); - expect(resMetricsDatastream.body.data_streams[0].indices[0].index_name).equal( - `.ds-${metricsTemplateName}-default-000001` - ); + const resMetricsDatastream = await es.transport.request({ + method: 'GET', + path: `/_data_stream/${metricsTemplateName}-default`, }); + expect(resLogsDatastream.body.data_streams.length).equal(1); + expect(resLogsDatastream.body.data_streams[0].indices.length).equal(1); + expect(resLogsDatastream.body.data_streams[0].indices[0].index_name).equal( + `.ds-${logsTemplateName}-default-000001` + ); + expect(resMetricsDatastream.body.data_streams.length).equal(1); + expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1); + expect(resMetricsDatastream.body.data_streams[0].indices[0].index_name).equal( + `.ds-${metricsTemplateName}-default-000001` + ); }); - describe('rollover datastream when mappings are not compatible', async () => { - skipIfNoDockerRegistry(providerContext); - let resLogsDatastream: any; - let resMetricsDatastream: any; - before(async () => { - await installPackage(pkgUpdateKey); - resLogsDatastream = await es.transport.request({ - method: 'GET', - path: `/_data_stream/${logsTemplateName}-default`, - }); - resMetricsDatastream = await es.transport.request({ - method: 'GET', - path: `/_data_stream/${metricsTemplateName}-default`, - }); + + it('after update, it should have rolled over logs datastream because mappings are not compatible and not metrics', async function () { + await installPackage(pkgUpdateKey); + const resLogsDatastream = await es.transport.request({ + method: 'GET', + path: `/_data_stream/${logsTemplateName}-default`, }); - it('should have rolled over logs datastream', async function () { - expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2); - expect(resLogsDatastream.body.data_streams[0].indices[1].index_name).equal( - `.ds-${logsTemplateName}-default-000002` - ); + const resMetricsDatastream = await es.transport.request({ + method: 'GET', + path: `/_data_stream/${metricsTemplateName}-default`, }); - it('should have not rolled over metrics datastream', async function () { - expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1); + expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2); + expect(resLogsDatastream.body.data_streams[0].indices[1].index_name).equal( + `.ds-${logsTemplateName}-default-000002` + ); + expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1); + }); + it('should be able to upgrade a package after a rollover', async function () { + await es.transport.request({ + method: 'POST', + path: `/${logsTemplateName}-default/_rollover`, + }); + const resLogsDatastream = await es.transport.request({ + method: 'GET', + path: `/_data_stream/${logsTemplateName}-default`, }); + expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2); + await installPackage(pkgUpdateKey); }); }); } From 82663de77b6b15ce832b20426c2de0974e1d940e Mon Sep 17 00:00:00 2001 From: neptunian Date: Mon, 12 Oct 2020 13:14:35 -0400 Subject: [PATCH 4/5] improve query --- .../server/services/epm/elasticsearch/template/template.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/template/template.ts b/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/template/template.ts index 9391474bf4ccb7..8d33180d6262dd 100644 --- a/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/template/template.ts +++ b/x-pack/plugins/ingest_manager/server/services/epm/elasticsearch/template/template.ts @@ -415,9 +415,10 @@ const updateExistingIndex = async ({ index: indexTemplate.index_patterns[0], body: { size: 1, + _source: ['data_stream.namespace', 'data_stream.type', 'data_stream.dataset'], query: { bool: { - must: [ + filter: [ { exists: { field: 'data_stream.type', From 7ed638fcbe47de58bf17619a220eb2c881de04f2 Mon Sep 17 00:00:00 2001 From: neptunian Date: Mon, 12 Oct 2020 15:09:24 -0400 Subject: [PATCH 5/5] remove dup --- .../test/ingest_manager_api_integration/apis/epm/data_stream.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/test/ingest_manager_api_integration/apis/epm/data_stream.ts b/x-pack/test/ingest_manager_api_integration/apis/epm/data_stream.ts index 57d3a642357797..b9558240ca0071 100644 --- a/x-pack/test/ingest_manager_api_integration/apis/epm/data_stream.ts +++ b/x-pack/test/ingest_manager_api_integration/apis/epm/data_stream.ts @@ -32,7 +32,6 @@ export default function (providerContext: FtrProviderContext) { }; describe('datastreams', async () => { - skipIfNoDockerRegistry(providerContext); skipIfNoDockerRegistry(providerContext); beforeEach(async () => { await installPackage(pkgKey);