-
Notifications
You must be signed in to change notification settings - Fork 1
/
cdk-callback-stack.ts
281 lines (250 loc) · 9.55 KB
/
cdk-callback-stack.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
import { Stack, StackProps } from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as cdk from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as sns from 'aws-cdk-lib/aws-sns';
import * as apiGateway from 'aws-cdk-lib/aws-apigateway';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as subscriptions from 'aws-cdk-lib/aws-sns-subscriptions';
import {SqsEventSource} from 'aws-cdk-lib/aws-lambda-event-sources';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import * as events from 'aws-cdk-lib/aws-events';
import * as targets from 'aws-cdk-lib/aws-events-targets';
import * as logs from 'aws-cdk-lib/aws-logs';
export class CdkCallbackStack extends Stack {
public Machine: sfn.StateMachine;
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
// API GATEWAY
const stage = "dev";
// log group api
const logGroup = new logs.LogGroup(this, 'AccessLogsFowStepFunction', {
retention: 90, // Keep logs for 90 days
});
logGroup.grantWrite(new iam.ServicePrincipal('apigateway.amazonaws.com'));
// api-role
const apiRole = new iam.Role(this, "api-role-stepfunction", {
roleName: "ApiRoleStepFunction",
assumedBy: new iam.ServicePrincipal("apigateway.amazonaws.com")
});
apiRole.addToPolicy(new iam.PolicyStatement({
resources: ['*'],
actions: ['lambda:InvokeFunction']
}));
apiRole.addManagedPolicy({
managedPolicyArn: 'arn:aws:iam::aws:policy/AWSLambdaExecute',
});
// define api gateway
const apigw = new apiGateway.RestApi(this, 'ApiStepFunction', {
description: 'API Gateway for Step Function',
endpointTypes: [apiGateway.EndpointType.REGIONAL],
defaultMethodOptions: {
authorizationType: apiGateway.AuthorizationType.NONE
},
deployOptions: {
stageName: stage,
accessLogDestination: new apiGateway.LogGroupLogDestination(logGroup),
accessLogFormat: apiGateway.AccessLogFormat.jsonWithStandardFields({
caller: false,
httpMethod: true,
ip: true,
protocol: true,
requestTime: true,
resourcePath: true,
responseLength: true,
status: true,
user: true
}),
},
});
// define template
const templateString: string = `#set($inputRoot = $input.path('$'))
{
"requestId": "$input.params('requestId')",
"timestamp": "$input.params('timestamp')",
"token": "$input.params('token')"
}`;
const requestTemplates = { // path through
'application/json': templateString,
};
// Lambda for verification success
const lambdaVerificationSuccess = new lambda.Function(this, "LambdaVerificationSuccess", {
description: 'make verification success',
runtime: lambda.Runtime.NODEJS_14_X,
code: lambda.Code.fromAsset("../lambda-for-verification-success"),
handler: "index.handler",
timeout: cdk.Duration.seconds(3),
environment: {
}
});
// define method
const apiName = "verification";
const status = apigw.root.addResource(apiName);
status.addMethod('GET', new apiGateway.LambdaIntegration(lambdaVerificationSuccess, {
passthroughBehavior: apiGateway.PassthroughBehavior.WHEN_NO_TEMPLATES, // options: NEVER
credentialsRole: apiRole,
requestTemplates: requestTemplates,
integrationResponses: [{
statusCode: '200',
}],
proxy:false,
}), {
requestParameters: {
'method.request.querystring.deviceid': true,
'method.request.querystring.startTimestamp': true,
},
methodResponses: [ // API Gateway sends to the client that called a method.
{
statusCode: '200',
responseModels: {
'application/json': apiGateway.Model.EMPTY_MODEL,
},
}
]
});
new cdk.CfnOutput(this, 'EndpointUrl', {
value: apigw.url,
description: 'The endpoint of API Gateway',
});
// SQS - queueVerification
const queue = new sqs.Queue(this, 'VerificationQueue');
new cdk.CfnOutput(this, 'sqsVerificationUrl', {
value: queue.queueUrl,
description: 'The url of the Verification Queue',
});
// SNS
const topic = new sns.Topic(this, 'cdk-sns-callback', {
topicName: 'cdk-sns-callback',
fifo: false // standard
});
topic.addSubscription(new subscriptions.EmailSubscription('storytimebot21@gmail.com'));
new cdk.CfnOutput(this, 'snsTopicArn', {
value: topic.topicArn,
description: 'The arn of the SNS topic',
});
// Lambda for task generator
const lambdaTaskGenerator = new lambda.Function(this, "LambdaTaskGenerator", {
description: 'generate task info',
runtime: lambda.Runtime.NODEJS_14_X,
code: lambda.Code.fromAsset("../lambda-for-task-generator"),
handler: "index.handler",
timeout: cdk.Duration.seconds(3),
environment: {
}
});
new cdk.CfnOutput(this, 'LambdaTaskGeneratorARN', {
value: lambdaTaskGenerator.functionArn,
description: 'The arn of lambda for task generator',
});
// Lambda for verification message
const lambdaVerificationMessage = new lambda.Function(this, "LambdaVerificationMessage", {
description: 'make verification message',
runtime: lambda.Runtime.NODEJS_14_X,
code: lambda.Code.fromAsset("../lambda-for-verification-message"),
handler: "index.handler",
timeout: cdk.Duration.seconds(3),
environment: {
apiName: apiName,
apigwUrl: apigw.url,
sqsUrl: queue.queueUrl,
topicArn: topic.topicArn
}
});
new cdk.CfnOutput(this, 'LambdaVerificationMessageARN', {
value: lambdaVerificationMessage.functionArn,
description: 'The arn of lambda for verification message',
});
// grant permission to sqs:ReceiveMessage
queue.grantConsumeMessages(lambdaVerificationMessage);
// grant permission to publish toward topic
topic.grantPublish(lambdaVerificationMessage);
// add event source
lambdaVerificationMessage.addEventSource(new SqsEventSource(queue));
// Lambda for processing
const lambdaProcessing = new lambda.Function(this, "LambdaProcessing", {
description: 'main processing',
runtime: lambda.Runtime.NODEJS_14_X,
code: lambda.Code.fromAsset("../lambda-for-processing"),
handler: "index.handler",
timeout: cdk.Duration.seconds(3),
environment: {
}
});
//Lambda invocation for generating task
const lambdaTaskGeneratorInvocation = new tasks.LambdaInvoke(this, 'Generate task', {
lambdaFunction: lambdaTaskGenerator,
outputPath: '$.Payload',
});
// task for SQS
const sqsVerificationTask = new tasks.SqsSendMessage(this, 'Request user varification', {
queue,
messageBody: sfn.TaskInput.fromObject({
"Payload.$": "$",
TaskToken: sfn.JsonPath.taskToken
}),
// messageBody: sfn.TaskInput.fromText(sfn.JsonPath.taskToken),
integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
});
// Lambda invocation for processing
const lambdaProcessingInvocation = new tasks.LambdaInvoke(this, 'Return to main processing', {
lambdaFunction: lambdaProcessing,
inputPath: '$',
outputPath: '$.Payload',
//payload: sfn.TaskInput.fromJsonPathAt('$.input.FrsAfe3SetupStateMachine.ProvisionRDSInstance')
payload: sfn.TaskInput.fromObject({
"Task.$": "$",
})
/* parameters: {
"data.$": "$",
"status": "UNKNOWN_CHANNEL"
} */
});
//Create the workflow definition
const definition = lambdaTaskGeneratorInvocation.next(sqsVerificationTask)
.next(lambdaProcessingInvocation);
//Create the statemachine
this.Machine = new sfn.StateMachine(this, "StateMachine", {
definition,
stateMachineName: 'WaitForCallback',
stateMachineType: sfn.StateMachineType.STANDARD,
timeout: cdk.Duration.minutes(5),
});
new cdk.CfnOutput(this, 'StateMachineARN', {
value: this.Machine.stateMachineArn,
description: 'The arn of StateMachineARN',
});
// grant permission to step function
queue.grantSendMessages(this.Machine);
lambdaTaskGenerator.grantInvoke(this.Machine);
lambdaProcessing.grantInvoke(this.Machine);
// policy for lambda to send report toward step function
const sendTaskPolicyStatement = new iam.PolicyStatement({
resources: [this.Machine.stateMachineArn],
actions: [
"states:SendTaskFailure",
"states:SendTaskHeartbeat",
"states:SendTaskSuccess",
],
});
lambdaVerificationSuccess.addToRolePolicy(sendTaskPolicyStatement);
// role for state machine
const roleStateMachine = new iam.Role(this, 'RoleStateMachine', { // To-Do check required??
assumedBy: new iam.ServicePrincipal('states.amazonaws.com'),
});
this.Machine.grantTaskResponse(roleStateMachine);
// event bridge for batch
const eventRole = new iam.Role(this, 'EventBridgeRole', {
assumedBy: new iam.ServicePrincipal('events.amazonaws.com'),
});
const rule = new events.Rule(this, 'Cron', {
description: "Schedule for cron job",
schedule: events.Schedule.expression('rate(10 minutes)'),
});
rule.addTarget(new targets.SfnStateMachine(this.Machine, {
input: events.RuleTargetInput.fromObject({}),
role: eventRole
}));
}
}