Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update logstash pipeline management to use system index APIs #80405

Merged
merged 10 commits into from
Nov 11, 2020

This file was deleted.

This file was deleted.

This file was deleted.

21 changes: 9 additions & 12 deletions x-pack/plugins/logstash/server/models/pipeline/pipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ describe('pipeline', () => {
describe('Pipeline', () => {
describe('fromUpstreamJSON factory method', () => {
const upstreamJSON = {
_id: 'apache',
_source: {
apache: {
description: 'this is an apache pipeline',
pipeline_metadata: {
version: 1,
Expand All @@ -21,25 +20,23 @@ describe('pipeline', () => {
pipeline: 'input {} filter { grok {} }\n output {}',
},
};
const upstreamId = 'apache';

it('returns correct Pipeline instance', () => {
const pipeline = Pipeline.fromUpstreamJSON(upstreamJSON);
expect(pipeline.id).toBe(upstreamJSON._id);
expect(pipeline.description).toBe(upstreamJSON._source.description);
expect(pipeline.username).toBe(upstreamJSON._source.username);
expect(pipeline.pipeline).toBe(upstreamJSON._source.pipeline);
expect(pipeline.id).toBe(upstreamId);
expect(pipeline.description).toBe(upstreamJSON.apache.description);
expect(pipeline.username).toBe(upstreamJSON.apache.username);
expect(pipeline.pipeline).toBe(upstreamJSON.apache.pipeline);
});

it('throws if pipeline argument does not contain an id property', () => {
const badJSON = {
// no _id
_source: upstreamJSON._source,
};
it('throws if pipeline argument does not contain id as a key', () => {
const badJSON = {};
const testFromUpstreamJsonError = () => {
return Pipeline.fromUpstreamJSON(badJSON);
};
expect(testFromUpstreamJsonError).toThrowError(
/upstreamPipeline argument must contain an id property/i
/upstreamPipeline argument must contain pipeline id as a key/i
);
});
});
Expand Down
14 changes: 7 additions & 7 deletions x-pack/plugins/logstash/server/models/pipeline/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,21 @@ export class Pipeline {

// generate Pipeline object from elasticsearch response
static fromUpstreamJSON(upstreamPipeline: Record<string, any>) {
if (!upstreamPipeline._id) {
if (Object.keys(upstreamPipeline).length !== 1) {
throw badRequest(
i18n.translate(
'xpack.logstash.upstreamPipelineArgumentMustContainAnIdPropertyErrorMessage',
{
defaultMessage: 'upstreamPipeline argument must contain an id property',
defaultMessage: 'upstreamPipeline argument must contain pipeline id as a key',
}
)
);
}
const id = get(upstreamPipeline, '_id') as string;
const description = get(upstreamPipeline, '_source.description') as string;
const username = get(upstreamPipeline, '_source.username') as string;
const pipeline = get(upstreamPipeline, '_source.pipeline') as string;
const settings = get(upstreamPipeline, '_source.pipeline_settings') as Record<string, any>;
const id = Object.keys(upstreamPipeline).pop() as string;
const description = get(upstreamPipeline, id + '.description') as string;
const username = get(upstreamPipeline, id + '.username') as string;
const pipeline = get(upstreamPipeline, id + '.pipeline') as string;
const settings = get(upstreamPipeline, id + '.pipeline_settings') as Record<string, any>;

const opts: PipelineOptions = { id, description, username, pipeline, settings };

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import { PipelineListItem } from './pipeline_list_item';
describe('pipeline_list_item', () => {
describe('PipelineListItem', () => {
const upstreamJSON = {
_id: 'apache',
_source: {
apache: {
description: 'this is an apache pipeline',
last_modified: '2017-05-14T02:50:51.250Z',
pipeline_metadata: {
Expand All @@ -20,24 +19,22 @@ describe('pipeline_list_item', () => {
username: 'elastic',
pipeline: 'input {} filter { grok {} }\n output {}',
},
_index: 'index',
_type: 'type',
_score: 100,
};
const upstreamId = 'apache';

describe('fromUpstreamJSON factory method', () => {
it('returns correct PipelineListItem instance', () => {
const pipelineListItem = PipelineListItem.fromUpstreamJSON(upstreamJSON);
expect(pipelineListItem.id).toBe(upstreamJSON._id);
expect(pipelineListItem.description).toBe(upstreamJSON._source.description);
expect(pipelineListItem.username).toBe(upstreamJSON._source.username);
expect(pipelineListItem.last_modified).toBe(upstreamJSON._source.last_modified);
const pipelineListItem = PipelineListItem.fromUpstreamJSON(upstreamId, upstreamJSON);
expect(pipelineListItem.id).toBe(upstreamId);
expect(pipelineListItem.description).toBe(upstreamJSON.apache.description);
expect(pipelineListItem.username).toBe(upstreamJSON.apache.username);
expect(pipelineListItem.last_modified).toBe(upstreamJSON.apache.last_modified);
});
});

describe('downstreamJSON getter method', () => {
it('returns the downstreamJSON JSON', () => {
const pipelineListItem = PipelineListItem.fromUpstreamJSON(upstreamJSON);
const pipelineListItem = PipelineListItem.fromUpstreamJSON(upstreamId, upstreamJSON);
const expectedDownstreamJSON = {
id: 'apache',
description: 'this is an apache pipeline',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import { get } from 'lodash';
import { Hit, PipelineListItemOptions } from '../../types';
import { PipelineListItemOptions } from '../../types';

export class PipelineListItem {
public readonly id: string;
Expand Down Expand Up @@ -34,12 +34,12 @@ export class PipelineListItem {
* Takes the json GET response from ES and constructs a pipeline model to be used
* in Kibana downstream
*/
static fromUpstreamJSON(pipeline: Hit) {
static fromUpstreamJSON(id: string, pipeline: Record<string, any>) {
const opts = {
id: pipeline._id,
description: get(pipeline, '_source.description') as string,
last_modified: get(pipeline, '_source.last_modified') as string,
username: get(pipeline, '_source.username') as string,
id,
description: get(pipeline, id + '.description') as string,
last_modified: get(pipeline, id + '.last_modified') as string,
username: get(pipeline, id + '.username') as string,
};

return new PipelineListItem(opts);
Expand Down
6 changes: 2 additions & 4 deletions x-pack/plugins/logstash/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ export class LogstashPlugin implements Plugin {
},
privileges: [
{
requiredClusterPrivileges: [],
requiredIndexPrivileges: {
['.logstash']: ['read'],
},
requiredClusterPrivileges: ['manage_logstash_pipelines'],
requiredIndexPrivileges: {},
ui: [],
},
],
Expand Down
8 changes: 3 additions & 5 deletions x-pack/plugins/logstash/server/routes/pipeline/delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
import { schema } from '@kbn/config-schema';
import { IRouter } from 'src/core/server';
import { INDEX_NAMES } from '../../../common/constants';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';

import { checkLicense } from '../../lib/check_license';
Expand All @@ -25,10 +24,9 @@ export function registerPipelineDeleteRoute(router: IRouter) {
router.handleLegacyErrors(async (context, request, response) => {
const client = context.logstash!.esClient;

await client.callAsCurrentUser('delete', {
index: INDEX_NAMES.PIPELINES,
id: request.params.id,
refresh: 'wait_for',
await client.callAsCurrentUser('transport.request', {
Copy link
Contributor

@mshustov mshustov Nov 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Logstash plugin needs to be migrated to the new Elasticsearch client https://github.com/elastic/elasticsearch-js as we are going to remove the old one in v8.0. Changelog: https://github.com/elastic/kibana/blob/master/src/core/MIGRATION_EXAMPLES.md#elasticsearch-client

  2. transport.request doesn't provide type safety.

  3. The current implementation doesn't prefix endpoints with _kibana as outlined in the migration path for 7.x Kibana's system indices #81536

In #82716, we are going to provide a separate Elasticsearch client to address 2 & 3 points.
Logstash plugin will:

  • migrate to provided SystemIndices Elasticsearch client
  • adopt all usage places to make sure compatibility with the new Elasticsearch client version
  • migrate to /_logstash/pipeline/ endpoints (as done in this PR)

How soon do you want to migrate to System Indices? Can you wait for #82716?
@jaymode @kaisecheng

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding 3, these APIs will not be prefixed with _kibana as they exist in their own plugin within Elasticsearch and not within the Kibana system index plugin.

Ideally, I'd like to see us move the Logstash UI to use the system index APIs sooner rather than later to get more of the system index work into users' hands.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding 3, these APIs will not be prefixed with _kibana as they exist in their own plugin within Elasticsearch and not within the Kibana system index plugin.

Makes sense to merge it then. You just need to address 1st point then. Can be done in the follow-up.

Hopefully we can fix the 2nd problem as well with #80405 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will address 1st point as a follow-up issue

Copy link

@roaksoax roaksoax Nov 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@restrry is there a meta issue/transition plan to support the new elasticsearch-js client library (not just for logstash)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

path: '/_logstash/pipeline/' + encodeURIComponent(request.params.id),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@delvedor I can't find /_logstash support in the new Elasticsearch client. Why it so?
https://github.com/elastic/elasticsearch-js/blob/master/api/kibana.d.ts

Copy link
Member

@delvedor delvedor Nov 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not aware of that endpoint, and I wasn't able to find it in the rest-api-spec (and here). Are you sure it's a public API?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent for these Logstash APIs is that they are for communication between the stack components and Elasticsearch so we intentionally did not publish a rest api spec for these. If this is necessary, we can probably add specs for these APIs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

communication between the stack components and Elasticsearch so we intentionally did not publish a rest api spec for these.

Can we add a separate REST API spec for such cases? We need to make sure that all the changes in REST API are reflected in the Kibana code as well - we rely on TypeScript type definitions generated from REST API spec.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ another vote on this. I'd be nice to leverage the safety net y'all created with the API spec, but for product to product APIs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for missing an update here, the ES team discussed this and came to the conclusion that we'd add a spec and docs for this API. This work hasn't been started yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API spec files have been merged in elastic/elasticsearch#67788

method: 'DELETE',
});

return response.noContent();
Expand Down
10 changes: 4 additions & 6 deletions x-pack/plugins/logstash/server/routes/pipeline/load.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import { schema } from '@kbn/config-schema';
import { IRouter } from 'src/core/server';

import { INDEX_NAMES } from '../../../common/constants';
import { Pipeline } from '../../models/pipeline';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
import { checkLicense } from '../../lib/check_license';
Expand All @@ -26,14 +25,13 @@ export function registerPipelineLoadRoute(router: IRouter) {
router.handleLegacyErrors(async (context, request, response) => {
const client = context.logstash!.esClient;

const result = await client.callAsCurrentUser('get', {
index: INDEX_NAMES.PIPELINES,
id: request.params.id,
_source: ['description', 'username', 'pipeline', 'pipeline_settings'],
const result = await client.callAsCurrentUser('transport.request', {
path: '/_logstash/pipeline/' + encodeURIComponent(request.params.id),
method: 'GET',
ignore: [404],
});

if (!result.found) {
if (result[request.params.id] === undefined) {
return response.notFound();
}

Expand Down
8 changes: 3 additions & 5 deletions x-pack/plugins/logstash/server/routes/pipeline/save.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { schema } from '@kbn/config-schema';
import { i18n } from '@kbn/i18n';
import { IRouter } from 'src/core/server';

import { INDEX_NAMES } from '../../../common/constants';
import { Pipeline } from '../../models/pipeline';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
import { SecurityPluginSetup } from '../../../../security/server';
Expand Down Expand Up @@ -41,11 +40,10 @@ export function registerPipelineSaveRoute(router: IRouter, security?: SecurityPl
const client = context.logstash!.esClient;
const pipeline = Pipeline.fromDownstreamJSON(request.body, request.params.id, username);

await client.callAsCurrentUser('index', {
index: INDEX_NAMES.PIPELINES,
id: pipeline.id,
await client.callAsCurrentUser('transport.request', {
path: '/_logstash/pipeline/' + encodeURIComponent(pipeline.id),
method: 'PUT',
body: pipeline.upstreamJSON,
refresh: 'wait_for',
});

return response.noContent();
Expand Down
8 changes: 3 additions & 5 deletions x-pack/plugins/logstash/server/routes/pipelines/delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ import { schema } from '@kbn/config-schema';
import { LegacyAPICaller, IRouter } from 'src/core/server';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';

import { INDEX_NAMES } from '../../../common/constants';
import { checkLicense } from '../../lib/check_license';

async function deletePipelines(callWithRequest: LegacyAPICaller, pipelineIds: string[]) {
const deletePromises = pipelineIds.map((pipelineId) => {
return callWithRequest('delete', {
index: INDEX_NAMES.PIPELINES,
id: pipelineId,
refresh: 'wait_for',
return callWithRequest('transport.request', {
path: '/_logstash/pipeline/' + encodeURIComponent(pipelineId),
method: 'DELETE',
})
.then((success) => ({ success }))
.catch((error) => ({ error }));
Expand Down
Loading