-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
helpers.ts
174 lines (157 loc) · 5.97 KB
/
helpers.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import * as iam from 'aws-cdk-lib/aws-iam';
import * as firehose from '@aws-cdk/aws-kinesisfirehose-alpha';
import { CfnDeliveryStream } from 'aws-cdk-lib/aws-kinesisfirehose';
import * as kms from 'aws-cdk-lib/aws-kms';
import * as logs from 'aws-cdk-lib/aws-logs';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as cdk from 'aws-cdk-lib/core';
import { Construct, IDependable, Node } from 'constructs';
import { DestinationS3BackupProps } from '../common';
export interface DestinationLoggingProps {
/**
* If true, log errors when data transformation or data delivery fails.
*
* If `logGroup` is provided, this will be implicitly set to `true`.
*
* @default true - errors are logged.
*/
readonly logging?: boolean;
/**
* The CloudWatch log group where log streams will be created to hold error logs.
*
* @default - if `logging` is set to `true`, a log group will be created for you.
*/
readonly logGroup?: logs.ILogGroup;
/**
* The IAM role associated with this destination.
*/
readonly role: iam.IRole;
/**
* The ID of the stream that is created in the log group where logs will be placed.
*
* Must be unique within the log group, so should be different every time this function is called.
*/
readonly streamId: string;
}
interface ConfigWithDependables {
/**
* Resources that were created by the sub-config creator that must be deployed before the delivery stream is deployed.
*/
readonly dependables: IDependable[];
}
export interface DestinationLoggingConfig extends ConfigWithDependables {
/**
* Logging options that will be injected into the destination configuration.
*/
readonly loggingOptions: CfnDeliveryStream.CloudWatchLoggingOptionsProperty;
}
export interface DestinationBackupConfig extends ConfigWithDependables {
/**
* S3 backup configuration that will be injected into the destination configuration.
*/
readonly backupConfig: CfnDeliveryStream.S3DestinationConfigurationProperty;
}
export function createLoggingOptions(scope: Construct, props: DestinationLoggingProps): DestinationLoggingConfig | undefined {
if (props.logging === false && props.logGroup) {
throw new Error('logging cannot be set to false when logGroup is provided');
}
if (props.logging !== false || props.logGroup) {
const logGroup = props.logGroup ?? Node.of(scope).tryFindChild('LogGroup') as logs.ILogGroup ?? new logs.LogGroup(scope, 'LogGroup');
const logGroupGrant = logGroup.grantWrite(props.role);
return {
loggingOptions: {
enabled: true,
logGroupName: logGroup.logGroupName,
logStreamName: logGroup.addStream(props.streamId).logStreamName,
},
dependables: [logGroupGrant],
};
}
return undefined;
}
export function createBufferingHints(
interval?: cdk.Duration,
size?: cdk.Size,
): CfnDeliveryStream.BufferingHintsProperty | undefined {
if (!interval && !size) {
return undefined;
}
const intervalInSeconds = interval?.toSeconds() ?? 300;
if (intervalInSeconds > 900) {
throw new Error(`Buffering interval must be less than 900 seconds. Buffering interval provided was ${intervalInSeconds} seconds.`);
}
const sizeInMBs = size?.toMebibytes() ?? 5;
if (sizeInMBs < 1 || sizeInMBs > 128) {
throw new Error(`Buffering size must be between 1 and 128 MiBs. Buffering size provided was ${sizeInMBs} MiBs.`);
}
return { intervalInSeconds, sizeInMBs };
}
export function createEncryptionConfig(
role: iam.IRole,
encryptionKey?: kms.IKey,
): CfnDeliveryStream.EncryptionConfigurationProperty | undefined {
encryptionKey?.grantEncryptDecrypt(role);
return encryptionKey
? { kmsEncryptionConfig: { awskmsKeyArn: encryptionKey.keyArn } }
: undefined;
}
export function createProcessingConfig(
scope: Construct,
role: iam.IRole,
dataProcessor?: firehose.IDataProcessor,
): CfnDeliveryStream.ProcessingConfigurationProperty | undefined {
return dataProcessor
? {
enabled: true,
processors: [renderDataProcessor(dataProcessor, scope, role)],
}
: undefined;
}
function renderDataProcessor(
processor: firehose.IDataProcessor,
scope: Construct,
role: iam.IRole,
): CfnDeliveryStream.ProcessorProperty {
const processorConfig = processor.bind(scope, { role });
const parameters = [{ parameterName: 'RoleArn', parameterValue: role.roleArn }];
parameters.push(processorConfig.processorIdentifier);
if (processor.props.bufferInterval) {
parameters.push({ parameterName: 'BufferIntervalInSeconds', parameterValue: processor.props.bufferInterval.toSeconds().toString() });
}
if (processor.props.bufferSize) {
parameters.push({ parameterName: 'BufferSizeInMBs', parameterValue: processor.props.bufferSize.toMebibytes().toString() });
}
if (processor.props.retries) {
parameters.push({ parameterName: 'NumberOfRetries', parameterValue: processor.props.retries.toString() });
}
return {
type: processorConfig.processorType,
parameters,
};
}
export function createBackupConfig(scope: Construct, role: iam.IRole, props?: DestinationS3BackupProps): DestinationBackupConfig | undefined {
if (!props || (props.mode === undefined && !props.bucket)) {
return undefined;
}
const bucket = props.bucket ?? new s3.Bucket(scope, 'BackupBucket');
const bucketGrant = bucket.grantReadWrite(role);
const { loggingOptions, dependables: loggingDependables } = createLoggingOptions(scope, {
logging: props.logging,
logGroup: props.logGroup,
role,
streamId: 'S3Backup',
}) ?? {};
return {
backupConfig: {
bucketArn: bucket.bucketArn,
roleArn: role.roleArn,
prefix: props.dataOutputPrefix,
errorOutputPrefix: props.errorOutputPrefix,
bufferingHints: createBufferingHints(props.bufferingInterval, props.bufferingSize),
compressionFormat: props.compression?.value,
encryptionConfiguration: createEncryptionConfig(role, props.encryptionKey),
cloudWatchLoggingOptions: loggingOptions,
},
dependables: [bucketGrant, ...(loggingDependables ?? [])],
};
}