Skip to content

Commit

Permalink
Allow to deploy a Kinesis function using the CLI
Browse files Browse the repository at this point in the history
This commit add support to publish a function that listen to a Kinesis
stream using the CLI.

Features:

- Kinesis can now defined in the YML.
- AWS installer can now define a custom policies to be added to the lambda
role.
- Kinesis support `TRIM_HORIZON` and `LATEST` as the starting position
strategy, `AT_TIMESTAMP` is currently not support because the
cloudformation API doesn't accept a TIMESTAMP when configuring the
subscription.
- Kinesis allow to configure the batch size.
  • Loading branch information
ph committed Jan 17, 2019
1 parent ac6aa32 commit 0482fc8
Show file tree
Hide file tree
Showing 10 changed files with 604 additions and 24 deletions.
55 changes: 53 additions & 2 deletions x-pack/functionbeat/_meta/beat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ functionbeat.provider.aws.functions:
type: sqs

# Description of the method to help identify them when you run multiples functions.
description: "lambda function for sqs events"
description: "lambda function for SQS events"

# Concurrency, is the reserved number of instances for that function.
# Default is 5.
Expand All @@ -80,8 +80,9 @@ functionbeat.provider.aws.functions:
#fields:
# env: staging

# List of cloudwatch log group registered to that function.
# List of SQS queues.
triggers:
# Arn for the SQS queue.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# Define custom processors for this function.
Expand All @@ -92,3 +93,53 @@ functionbeat.provider.aws.functions:
# max_depth: 1
# target: ""
# overwrite_keys: false
#

# Create a function that accepts events from Kinesis streams.
- name: kinesis
enabled: false
type: sqs

# Description of the method to help identify them when you run multiples functions.
description: "lambda function for Kinesis events"

# Concurrency, is the reserved number of instances for that function.
# Default is 5.
#
# Note: There is a hard limit of 1000 functions of any kind per account.
#concurrency: 5

# The maximum memory allocated for this function, the configured size must be a factor of 64.
# There is a hard limit of 3008MiB for each function. Default is 128MiB.
#memory_size: 128MiB

# Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue.
#dead_letter_config.target_arn:

# Optional fields that you can specify to add additional information to the
# output. Fields can be scalar values, arrays, dictionaries, or any nested
# combination of these.
#fields:
# env: staging

# Define custom processors for this function.
#processors:
# - decode_json_fields:
# fields: ["message"]
# process_array: false
# max_depth: 1
# target: ""
# overwrite_keys: false

# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
#batch_size: 100

# Starting position is where to start reading events from the Kinesis stream.
# Default is trim_horizon.
#starting_position: "trim_horizon"
55 changes: 55 additions & 0 deletions x-pack/functionbeat/_meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,48 @@ functionbeat.provider.aws.functions:
#fields:
# env: staging

# List of SQS queues.
triggers:
# Arn for the SQS queue.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# Define custom processors for this function.
#processors:
# - decode_json_fields:
# fields: ["message"]
# process_array: false
# max_depth: 1
# target: ""
# overwrite_keys: false
#

# Create a function that accepts events from Kinesis streams.
- name: kinesis
enabled: false
type: sqs

# Description of the method to help identify them when you run multiples functions.
description: "lambda function for Kinesis events"

# Concurrency, is the reserved number of instances for that function.
# Default is 5.
#
# Note: There is a hard limit of 1000 functions of any kind per account.
#concurrency: 5

# The maximum memory allocated for this function, the configured size must be a factor of 64.
# There is a hard limit of 3008MiB for each function. Default is 128MiB.
#memory_size: 128MiB

# Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue.
#dead_letter_config.target_arn:

# Optional fields that you can specify to add additional information to the
# output. Fields can be scalar values, arrays, dictionaries, or any nested
# combination of these.
#fields:
# env: staging

# Define custom processors for this function.
#processors:
# - decode_json_fields:
Expand All @@ -89,3 +131,16 @@ functionbeat.provider.aws.functions:
# max_depth: 1
# target: ""
# overwrite_keys: false

# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
#batch_size: 100

# Starting position is where to start reading events from the Kinesis stream.
# Default is trim_horizon.
#starting_position: "trim_horizon"
15 changes: 15 additions & 0 deletions x-pack/functionbeat/docs/config-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ are:

`cloudwatch_logs`:: Collects events from CloudWatch logs.
`sqs`:: Collects data from Amazon Simple Queue Service (SQS).
`kinesis`:: Collects data from a Kinesis stream.

[float]
[id="{beatname_lc}-description"]
Expand Down Expand Up @@ -125,3 +126,17 @@ default is 128 MiB.

The dead letter queue to use for messages that can't be processed successfully.
Set this option to an ARN that points to an SQS queue.

[float]
[id="{beatname_lc}-batch-size"]
==== `batch_size`

The number of events to read from a Kinesis stream, the minimal values is 100 and the maximun is
10000. The default is 100.

[float]
[id="{beatname_lc}-batch-size"]
==== `starting_position`

The starting position to read from a Kinesis stream, valids values are `trim_horizon` and `latest`.
The default is trim_horizon.
55 changes: 53 additions & 2 deletions x-pack/functionbeat/functionbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ functionbeat.provider.aws.functions:
type: sqs

# Description of the method to help identify them when you run multiples functions.
description: "lambda function for sqs events"
description: "lambda function for SQS events"

# Concurrency, is the reserved number of instances for that function.
# Default is 5.
Expand All @@ -80,8 +80,9 @@ functionbeat.provider.aws.functions:
#fields:
# env: staging

# List of cloudwatch log group registered to that function.
# List of SQS queues.
triggers:
# Arn for the SQS queue.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# Define custom processors for this function.
Expand All @@ -92,6 +93,56 @@ functionbeat.provider.aws.functions:
# max_depth: 1
# target: ""
# overwrite_keys: false
#

# Create a function that accepts events from Kinesis streams.
- name: kinesis
enabled: false
type: sqs

# Description of the method to help identify them when you run multiples functions.
description: "lambda function for Kinesis events"

# Concurrency, is the reserved number of instances for that function.
# Default is 5.
#
# Note: There is a hard limit of 1000 functions of any kind per account.
#concurrency: 5

# The maximum memory allocated for this function, the configured size must be a factor of 64.
# There is a hard limit of 3008MiB for each function. Default is 128MiB.
#memory_size: 128MiB

# Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue.
#dead_letter_config.target_arn:

# Optional fields that you can specify to add additional information to the
# output. Fields can be scalar values, arrays, dictionaries, or any nested
# combination of these.
#fields:
# env: staging

# Define custom processors for this function.
#processors:
# - decode_json_fields:
# fields: ["message"]
# process_array: false
# max_depth: 1
# target: ""
# overwrite_keys: false

# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
#batch_size: 100

# Starting position is where to start reading events from the Kinesis stream.
# Default is trim_horizon.
#starting_position: "trim_horizon"

#================================ General ======================================

Expand Down
55 changes: 55 additions & 0 deletions x-pack/functionbeat/functionbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,48 @@ functionbeat.provider.aws.functions:
#fields:
# env: staging

# List of SQS queues.
triggers:
# Arn for the SQS queue.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# Define custom processors for this function.
#processors:
# - decode_json_fields:
# fields: ["message"]
# process_array: false
# max_depth: 1
# target: ""
# overwrite_keys: false
#

# Create a function that accepts events from Kinesis streams.
- name: kinesis
enabled: false
type: sqs

# Description of the method to help identify them when you run multiples functions.
description: "lambda function for Kinesis events"

# Concurrency, is the reserved number of instances for that function.
# Default is 5.
#
# Note: There is a hard limit of 1000 functions of any kind per account.
#concurrency: 5

# The maximum memory allocated for this function, the configured size must be a factor of 64.
# There is a hard limit of 3008MiB for each function. Default is 128MiB.
#memory_size: 128MiB

# Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue.
#dead_letter_config.target_arn:

# Optional fields that you can specify to add additional information to the
# output. Fields can be scalar values, arrays, dictionaries, or any nested
# combination of these.
#fields:
# env: staging

# Define custom processors for this function.
#processors:
# - decode_json_fields:
Expand All @@ -90,6 +132,19 @@ functionbeat.provider.aws.functions:
# target: ""
# overwrite_keys: false

# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
#batch_size: 100

# Starting position is where to start reading events from the Kinesis stream.
# Default is trim_horizon.
#starting_position: "trim_horizon"

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
Expand Down
39 changes: 23 additions & 16 deletions x-pack/functionbeat/provider/aws/cli_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type AWSLambdaFunction struct {
}

type installer interface {
Policies() []cloudformation.AWSIAMRole_Policy
Template() *cloudformation.Template
LambdaConfig() *lambdaConfig
}
Expand Down Expand Up @@ -84,6 +85,27 @@ func (c *CLIManager) template(function installer, name, codeLoc string) *cloudfo
// Documentation: https://docs.aws.amazon.com/AWSCloudFormation/latest/APIReference/Welcome.html
// Intrinsic function reference: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/intrinsic-function-reference.html

// Default policies to writes logs from the Lambda.
policies := []cloudformation.AWSIAMRole_Policy{
cloudformation.AWSIAMRole_Policy{
PolicyName: cloudformation.Join("-", []string{"fnb", "lambda", name}),
PolicyDocument: map[string]interface{}{
"Statement": []map[string]interface{}{
map[string]interface{}{
"Action": []string{"logs:CreateLogStream", "Logs:PutLogEvents"},
"Effect": "Allow",
"Resource": []string{
cloudformation.Sub("arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/" + name + ":*"),
},
},
},
},
},
}

// Merge any specific policies from the service.
policies = append(policies, function.Policies()...)

// Create the roles for the lambda.
template := cloudformation.NewTemplate()
// doc: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-iam-role.html
Expand All @@ -106,22 +128,7 @@ func (c *CLIManager) template(function installer, name, codeLoc string) *cloudfo
RoleName: "functionbeat-lambda-" + name,
// Allow the lambda to write log to cloudwatch logs.
// doc: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-iam-policy.html
Policies: []cloudformation.AWSIAMRole_Policy{
cloudformation.AWSIAMRole_Policy{
PolicyName: cloudformation.Join("-", []string{"fnb", "lambda", name}),
PolicyDocument: map[string]interface{}{
"Statement": []map[string]interface{}{
map[string]interface{}{
"Action": []string{"logs:CreateLogStream", "Logs:PutLogEvents"},
"Effect": "Allow",
"Resource": []string{
cloudformation.Sub("arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/" + name + ":*"),
},
},
},
},
},
},
Policies: policies,
}

// Configure the Dead letter, any failed events will be send to the configured amazon resource name.
Expand Down
5 changes: 5 additions & 0 deletions x-pack/functionbeat/provider/aws/cloudwatch_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,8 @@ func (c *CloudwatchLogs) Template() *cloudformation.Template {
func (c *CloudwatchLogs) LambdaConfig() *lambdaConfig {
return c.config.LambdaConfig
}

// Policies returns a slice of policy to add to the lambda.
func (c *CloudwatchLogs) Policies() []cloudformation.AWSIAMRole_Policy {
return []cloudformation.AWSIAMRole_Policy{}
}
Loading

0 comments on commit 0482fc8

Please sign in to comment.