Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest Manager] Fix package upgrade breaking after first rollover before new data has arrived #79887

Merged
merged 6 commits into from
Oct 12, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions x-pack/plugins/ingest_manager/common/types/models/epm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ export enum ElasticsearchAssetType {
transform = 'transform',
}

export enum DataType {
logs = 'logs',
metrics = 'metrics',
}

export enum AgentAssetType {
input = 'input',
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
TemplateRef,
IndexTemplate,
IndexTemplateMappings,
DataType,
} from '../../../../types';
import { getRegistryDataStreamAssetBaseName } from '../index';

Expand Down Expand Up @@ -400,13 +401,6 @@ const updateExistingIndex = async ({
delete mappings.properties.stream;
delete mappings.properties.data_stream;

// get the data_stream values from the index template to compose data stream name
const indexMappings = await getIndexMappings(indexName, callCluster);
const dataStream = indexMappings[indexName].mappings.properties.data_stream.properties;
if (!dataStream.type.value || !dataStream.dataset.value || !dataStream.namespace.value)
throw new Error(`data_stream values are missing from the index template ${indexName}`);
const dataStreamName = `${dataStream.type.value}-${dataStream.dataset.value}-${dataStream.namespace.value}`;

// try to update the mappings first
try {
await callCluster('indices.putMapping', {
Expand All @@ -416,13 +410,54 @@ const updateExistingIndex = async ({
// if update fails, rollover data stream
} catch (err) {
try {
// get the data_stream values to compose datastream name
const searchDataStreamFieldsResponse = await callCluster('search', {
neptunian marked this conversation as resolved.
Show resolved Hide resolved
index: indexTemplate.index_patterns[0],
body: {
size: 1,
_source: ['data_stream.namespace', 'data_stream.type', 'data_stream.dataset'],
query: {
bool: {
filter: [
{
exists: {
field: 'data_stream.type',
},
},
{
exists: {
field: 'data_stream.dataset',
},
},
{
exists: {
field: 'data_stream.namespace',
},
},
],
},
},
},
});
if (searchDataStreamFieldsResponse.hits.total.value === 0)
throw new Error('data_stream fields are missing from datastream indices');
const {
dataset,
namespace,
type,
}: {
dataset: string;
namespace: string;
type: DataType;
} = searchDataStreamFieldsResponse.hits.hits[0]._source.data_stream;
const dataStreamName = `${type}-${dataset}-${namespace}`;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we want to check that type is a valid DataType? I suppose if they ingested data initially with an invalid data type then we should just continue doing that 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, i'm not sure this is the place to be validating that and whether we'd have to throw an error and stop the upgrade because of that.

const path = `/${dataStreamName}/_rollover`;
await callCluster('transport.request', {
method: 'POST',
path,
});
} catch (error) {
throw new Error(`cannot rollover data stream ${dataStreamName}`);
throw new Error(`cannot rollover data stream ${error}`);
}
}
// update settings after mappings was successful to ensure
Expand All @@ -438,14 +473,3 @@ const updateExistingIndex = async ({
throw new Error(`could not update index template settings for ${indexName}`);
}
};

const getIndexMappings = async (indexName: string, callCluster: CallESAsCurrentUser) => {
try {
const indexMappings = await callCluster('indices.getMapping', {
index: indexName,
});
return indexMappings;
} catch (err) {
throw new Error(`could not get mapping from ${indexName}`);
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ import {
import * as Registry from '../../registry';
import { loadFieldsFromYaml, Fields, Field } from '../../fields/field';
import { getPackageKeysByStatus } from '../../packages/get';
import { InstallationStatus, RegistryPackage, CallESAsCurrentUser } from '../../../../types';
import {
InstallationStatus,
RegistryPackage,
CallESAsCurrentUser,
DataType,
} from '../../../../types';
import { appContextService } from '../../../../services';

interface FieldFormatMap {
Expand Down Expand Up @@ -69,10 +74,7 @@ export interface IndexPatternField {
lang?: string;
readFromDocValues: boolean;
}
export enum IndexPatternType {
logs = 'logs',
metrics = 'metrics',
}

// TODO: use a function overload and make pkgName and pkgVersion required for install/update
// and not for an update removal. or separate out the functions
export async function installIndexPatterns(
Expand Down Expand Up @@ -117,7 +119,7 @@ export async function installIndexPatterns(
const packageVersionsInfo = await Promise.all(packageVersionsFetchInfoPromise);

// for each index pattern type, create an index pattern
const indexPatternTypes = [IndexPatternType.logs, IndexPatternType.metrics];
const indexPatternTypes = [DataType.logs, DataType.metrics];
indexPatternTypes.forEach(async (indexPatternType) => {
// if this is an update because a package is being uninstalled (no pkgkey argument passed) and no other packages are installed, remove the index pattern
if (!pkgName && installedPackages.length === 0) {
Expand All @@ -144,7 +146,7 @@ export async function installIndexPatterns(
// of all fields from all data streams matching data stream type
export const getAllDataStreamFieldsByType = async (
packages: RegistryPackage[],
dataStreamType: IndexPatternType
dataStreamType: DataType
): Promise<Fields> => {
const dataStreamsPromises = packages.reduce<Array<Promise<Field[]>>>((acc, pkg) => {
if (pkg.data_streams) {
Expand Down Expand Up @@ -389,7 +391,7 @@ export const ensureDefaultIndices = async (callCluster: CallESAsCurrentUser) =>
// that no matching indices exist https://github.com/elastic/kibana/issues/62343
const logger = appContextService.getLogger();
return Promise.all(
Object.keys(IndexPatternType).map(async (indexPattern) => {
Object.keys(DataType).map(async (indexPattern) => {
const defaultIndexPatternName = indexPattern + INDEX_PATTERN_PLACEHOLDER_SUFFIX;
const indexExists = await callCluster('indices.exists', { index: defaultIndexPatternName });
if (!indexExists) {
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/ingest_manager/server/types/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export {
// Agent Request types
PostAgentEnrollRequest,
PostAgentCheckinRequest,
DataType,
} from '../../common';

export type CallESAsCurrentUser = LegacyScopedClusterClient['callAsCurrentUser'];
Expand Down
102 changes: 50 additions & 52 deletions x-pack/test/ingest_manager_api_integration/apis/epm/data_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ export default function (providerContext: FtrProviderContext) {
await supertest.delete(`/api/fleet/epm/packages/${pkg}`).set('kbn-xsrf', 'xxxx');
};
const installPackage = async (pkg: string) => {
await supertest
return await supertest
.post(`/api/fleet/epm/packages/${pkg}`)
.set('kbn-xsrf', 'xxxx')
.send({ force: true });
.send({ force: true })
.expect(200);
};

describe('datastreams', async () => {
skipIfNoDockerRegistry(providerContext);
before(async () => {
skipIfNoDockerRegistry(providerContext);
neptunian marked this conversation as resolved.
Show resolved Hide resolved
beforeEach(async () => {
await installPackage(pkgKey);
await es.transport.request({
method: 'POST',
Expand Down Expand Up @@ -61,8 +63,7 @@ export default function (providerContext: FtrProviderContext) {
},
});
});
after(async () => {
await uninstallPackage(pkgUpdateKey);
afterEach(async () => {
await es.transport.request({
method: 'DELETE',
path: `/_data_stream/${logsTemplateName}-default`,
Expand All @@ -71,60 +72,57 @@ export default function (providerContext: FtrProviderContext) {
method: 'DELETE',
path: `/_data_stream/${metricsTemplateName}-default`,
});
await uninstallPackage(pkgKey);
await uninstallPackage(pkgUpdateKey);
});
describe('get datastreams after data sent', async () => {
skipIfNoDockerRegistry(providerContext);
let resLogsDatastream: any;
let resMetricsDatastream: any;
before(async () => {
resLogsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${logsTemplateName}-default`,
});
resMetricsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${metricsTemplateName}-default`,
});
});
it('should list the logs datastream', async function () {
expect(resLogsDatastream.body.data_streams.length).equal(1);
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(1);
expect(resLogsDatastream.body.data_streams[0].indices[0].index_name).equal(
`.ds-${logsTemplateName}-default-000001`
);
it('should list the logs and metrics datastream', async function () {
const resLogsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${logsTemplateName}-default`,
});
it('should list the metrics datastream', async function () {
expect(resMetricsDatastream.body.data_streams.length).equal(1);
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
expect(resMetricsDatastream.body.data_streams[0].indices[0].index_name).equal(
`.ds-${metricsTemplateName}-default-000001`
);
const resMetricsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${metricsTemplateName}-default`,
});
expect(resLogsDatastream.body.data_streams.length).equal(1);
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(1);
expect(resLogsDatastream.body.data_streams[0].indices[0].index_name).equal(
`.ds-${logsTemplateName}-default-000001`
);
expect(resMetricsDatastream.body.data_streams.length).equal(1);
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
expect(resMetricsDatastream.body.data_streams[0].indices[0].index_name).equal(
`.ds-${metricsTemplateName}-default-000001`
);
});
describe('rollover datastream when mappings are not compatible', async () => {
skipIfNoDockerRegistry(providerContext);
let resLogsDatastream: any;
let resMetricsDatastream: any;
before(async () => {
await installPackage(pkgUpdateKey);
resLogsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${logsTemplateName}-default`,
});
resMetricsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${metricsTemplateName}-default`,
});

it('after update, it should have rolled over logs datastream because mappings are not compatible and not metrics', async function () {
await installPackage(pkgUpdateKey);
const resLogsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${logsTemplateName}-default`,
});
it('should have rolled over logs datastream', async function () {
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
expect(resLogsDatastream.body.data_streams[0].indices[1].index_name).equal(
`.ds-${logsTemplateName}-default-000002`
);
const resMetricsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${metricsTemplateName}-default`,
});
it('should have not rolled over metrics datastream', async function () {
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
expect(resLogsDatastream.body.data_streams[0].indices[1].index_name).equal(
`.ds-${logsTemplateName}-default-000002`
);
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
});
it('should be able to upgrade a package after a rollover', async function () {
await es.transport.request({
method: 'POST',
path: `/${logsTemplateName}-default/_rollover`,
});
const resLogsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${logsTemplateName}-default`,
});
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
await installPackage(pkgUpdateKey);
});
});
}