From 845b932afdb7cc83f8bcb2524e96030443384e55 Mon Sep 17 00:00:00 2001 From: Francesco Giovannini Date: Wed, 15 Nov 2023 10:09:38 +0100 Subject: [PATCH] feat!: change constructor and methods supporting multiple queues whit one client for region/account BREAKING-CHANGE: changing constructor and methods signatures --- README.md | 26 ++++++++++---------- src/index.ts | 66 +++++++++++++++++++++++++++++--------------------- test/errors.ts | 18 +++++++------- test/index.ts | 32 ++++++++++++------------ 4 files changed, 76 insertions(+), 66 deletions(-) diff --git a/README.md b/README.md index 4d1158d..b5854e2 100644 --- a/README.md +++ b/README.md @@ -24,9 +24,9 @@ npm install @fgiova/mini-sqs-client import {MiniSQSClient} from '@fgiova/mini-sqs-client' import console = require("console"); -const client = new MiniSQSClient("arn:aws:sqs:eu-central-1:000000000000:test"); +const client = new MiniSQSClient("eu-central-1"); -await client.sendMessage({ +await client.sendMessage("arn:aws:sqs:eu-central-1:000000000000:test", { MessageBody: "Hello world", MessageAttributes: { "my-attribute": { @@ -36,7 +36,7 @@ await client.sendMessage({ } }); -const messages = await client.receiveMessage({ +const messages = await client.receiveMessage("arn:aws:sqs:eu-central-1:000000000000:test", { WaitTimeSeconds: 20, MaxNumberOfMessages: 1, MessageAttributeNames: ["my-attribute"] @@ -44,27 +44,27 @@ const messages = await client.receiveMessage({ const message = messages[0]; -await client.changeMessageVisibility({ +await client.changeMessageVisibility("arn:aws:sqs:eu-central-1:000000000000:test", { ReceiptHandle: message.ReceiptHandle, VisibilityTimeout: 10 }); console.log(message.Body); -await client.deleteMessage(message.ReceiptHandle); +await client.deleteMessage("arn:aws:sqs:eu-central-1:000000000000:test", message.ReceiptHandle); ``` ## API ```typescript -MiniSQSClient(queueARN: string, endpoint?: string, undiciOptions?: Pool.Options, signer?: Signer | SignerOptions) -MiniSQSClient.sendMessage(message: SendMessage): Promise -MiniSQSClient.sendMessageBatch(messages: SendMessage[]): Promise -MiniSQSClient.receiveMessage(options: ReceiveMessage): Promise -MiniSQSClient.deleteMessage(receiptHandle: string): Promise -MiniSQSClient.deleteMessageBatch(receiptHandles: string[]): Promise -MiniSQSClient.changeMessageVisibility(receiptHandle: string, visibilityTimeout: number): Promise -MiniSQSClient.changeMessageVisibilityBatch(receiptHandles: string[], visibilityTimeout: number): Promise +MiniSQSClient(region: string, endpoint?: string, undiciOptions?: Pool.Options, signer?: Signer | SignerOptions) +MiniSQSClient.sendMessage(queueARN: string, message: SendMessage): Promise +MiniSQSClient.sendMessageBatch(queueARN: string, messages: SendMessage[]): Promise +MiniSQSClient.receiveMessage(queueARN: string, options: ReceiveMessage): Promise +MiniSQSClient.deleteMessage(queueARN: string, receiptHandle: string): Promise +MiniSQSClient.deleteMessageBatch(queueARN: string, receiptHandles: string[]): Promise +MiniSQSClient.changeMessageVisibility(queueARN: string, receiptHandle: string, visibilityTimeout: number): Promise +MiniSQSClient.changeMessageVisibilityBatch(queueARN: string, receiptHandles: string[], visibilityTimeout: number): Promise ``` All types are defined in [schemas.ts](./src/schemas.ts) and are derived from the [AWS SQS API](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_Operations.html)
diff --git a/src/index.ts b/src/index.ts index 8b5aecf..d497585 100644 --- a/src/index.ts +++ b/src/index.ts @@ -10,20 +10,17 @@ import type { } from "./schemas"; export class MiniSQSClient { - private readonly queueSettings: { - region: string, - accountId: string, - queueName: string, - host: string, - endpoint: string - }; private readonly pool: Pool; private readonly undiciOptions: Pool.Options; private readonly signer: Signer; - constructor (queueARN: string, endpoint?: string, undiciOptions?: Pool.Options, signer?: Signer | SignerOptions) { + private region: string; + private endpoint: string; + + constructor (region: string, endpoint?: string, undiciOptions?: Pool.Options, signer?: Signer | SignerOptions) { this.undiciOptions = undiciOptions; - this.queueSettings = this.getQueueARN(queueARN, endpoint); - this.pool = new Pool(this.queueSettings.endpoint, undiciOptions); + this.region = region; + this.endpoint = endpoint ?? `https://sqs.${region}.amazonaws.com`; + this.pool = new Pool(this.endpoint, undiciOptions); if (signer instanceof Signer) { this.signer = signer; } @@ -42,9 +39,10 @@ export class MiniSQSClient { ]); } - private getQueueARN (queueARN: string, endpoint?: string) { + private getQueueARN (queueARN: string) { const [queueName, accountId, region] = queueARN.split(":").reverse(); - endpoint = endpoint ?? `https://sqs.${region}.amazonaws.com`; + if (region !== this.region) throw new Error(`Region ${region} does not match ${this.region}`); + const endpoint = this.endpoint; const url = new URL(endpoint); return { region, @@ -54,8 +52,14 @@ export class MiniSQSClient { endpoint } } - private async SQSRequest(body: B, target: SQSTarget, JSONResponse= true) { - const {region, accountId, queueName, host} = this.queueSettings; + private async SQSRequest(body: B, target: SQSTarget, queueSettings: { + region: string, + accountId: string, + queueName: string, + host: string, + endpoint: string + }, JSONResponse= true) { + const {region, accountId, queueName, host} = queueSettings; const requestBody = JSON.stringify({ ...body }); @@ -108,20 +112,22 @@ export class MiniSQSClient { }, []); } - async sendMessage (message: SendMessage) { - return this.SQSRequest(message, "SendMessage"); + async sendMessage (queueARN: string, message: SendMessage) { + const queueSettings = this.getQueueARN(queueARN); + return this.SQSRequest(message, "SendMessage", queueSettings); } - async sendMessageBatch (messages: SendMessageBatchItem[]) { + async sendMessageBatch (queueARN: string, messages: SendMessageBatchItem[]) { if(!Array.isArray(messages)){ throw new Error("messages must be an array"); } + const queueSettings = this.getQueueARN(queueARN); const messagesChunks = this.splitArrayMessages(messages); const responses = {} as SendMessageBatchResult; for(const messagesChunk of messagesChunks){ const responseChunk = await this.SQSRequest<{Entries:SendMessageBatchItem[]}, SendMessageBatchResult>({ Entries: messagesChunk - }, "SendMessageBatch"); + }, "SendMessageBatch", queueSettings); if(responseChunk.Failed ) { if(!responses.Failed) responses.Failed = []; responses.Failed.push(...responseChunk.Failed); @@ -134,17 +140,19 @@ export class MiniSQSClient { return responses; } - async deleteMessage (receiptHandle: string) { + async deleteMessage (queueARN: string, receiptHandle: string) { + const queueSettings = this.getQueueARN(queueARN); await this.SQSRequest<{ReceiptHandle:string}, boolean>({ ReceiptHandle: receiptHandle - }, "DeleteMessage", false); + }, "DeleteMessage", queueSettings, false); return true; } - async deleteMessageBatch (receiptHandles: string[]) { + async deleteMessageBatch (queueARN: string, receiptHandles: string[]) { if(!Array.isArray(receiptHandles)){ throw new Error("receiptHandles must be an array"); } + const queueSettings = this.getQueueARN(queueARN); const receiptHandlesData = receiptHandles.map((receiptHandle) => ({ Id: randomUUID(), ReceiptHandle: receiptHandle @@ -154,7 +162,7 @@ export class MiniSQSClient { Entries: { Id: UUID, ReceiptHandle: string }[] }, boolean>({ Entries: receiptHandlesData - }, "DeleteMessageBatch", false); + }, "DeleteMessageBatch", queueSettings,false); } catch (e) { throw new Error(`Error ${e.message}\n Deleting messages: ${JSON.stringify(receiptHandlesData)}`); @@ -162,8 +170,8 @@ export class MiniSQSClient { return true; } - async receiveMessage (receiveMessage: ReceiveMessage, clientClass = Client) { - const {region, accountId, queueName, host, endpoint} = this.queueSettings; + async receiveMessage (queueARN: string, receiveMessage: ReceiveMessage, clientClass = Client) { + const {region, accountId, queueName, host, endpoint} = this.getQueueARN(queueARN); const receiveBody = JSON.stringify({ ...receiveMessage, WaitTimeSeconds: receiveMessage.WaitTimeSeconds > 20 ? 20 : receiveMessage.WaitTimeSeconds @@ -209,25 +217,27 @@ export class MiniSQSClient { return responseData; } - async changeMessageVisibility (receiptHandle: string, visibilityTimeout: number) { + async changeMessageVisibility (queueARN: string, receiptHandle: string, visibilityTimeout: number) { + const queueSettings = this.getQueueARN(queueARN); await this.SQSRequest<{ReceiptHandle:string, VisibilityTimeout: number}, boolean>({ ReceiptHandle: receiptHandle, VisibilityTimeout: visibilityTimeout - }, "ChangeMessageVisibility", false); + }, "ChangeMessageVisibility", queueSettings,false); return true; } - async changeMessageVisibilityBatch (receiptHandles: string[], visibilityTimeout: number) { + async changeMessageVisibilityBatch (queueARN: string, receiptHandles: string[], visibilityTimeout: number) { if(!Array.isArray(receiptHandles)){ throw new Error("receiptHandles must be an array"); } + const queueSettings = this.getQueueARN(queueARN); await this.SQSRequest<{ Entries: { Id: UUID, ReceiptHandle: string, VisibilityTimeout: number }[] }, boolean>({Entries: receiptHandles.map((receiptHandle) => ({ Id: randomUUID(), ReceiptHandle: receiptHandle, VisibilityTimeout: visibilityTimeout - }))}, "ChangeMessageVisibilityBatch", false); + }))}, "ChangeMessageVisibilityBatch", queueSettings, false); return true; } } diff --git a/test/errors.ts b/test/errors.ts index 62d4b6c..b41eb47 100644 --- a/test/errors.ts +++ b/test/errors.ts @@ -13,7 +13,7 @@ test("MiniSQSClient Errors", async (t) => { const mockAgent = new MockAgent(); mockAgent.disableNetConnect(); const mockPool = mockAgent.get("https://sqs.eu-central-1.amazonaws.com"); - const client = new MiniSQSClient(queueARN, undefined, { + const client = new MiniSQSClient("eu-central-1", undefined, { factory: () => mockPool }); t.context = { @@ -39,7 +39,7 @@ test("MiniSQSClient Errors", async (t) => { }).reply(400, { message: "The request was rejected because the specified queue does not exist or you do not have access to it.", }); - await t.rejects(client.sendMessage(message), Error("The request was rejected because the specified queue does not exist or you do not have access to it.")); + await t.rejects(client.sendMessage(queueARN,message), Error("The request was rejected because the specified queue does not exist or you do not have access to it.")); }); await t.test("sendMessage Error without json message", async (t) => { const { mockPool, client } = t.context; @@ -53,7 +53,7 @@ test("MiniSQSClient Errors", async (t) => { }).reply(500, { error: "Generic Error", }); - await t.rejects(client.sendMessage(message), Error(JSON.stringify({ + await t.rejects(client.sendMessage(queueARN,message), Error(JSON.stringify({ error: "Generic Error", }))); }); @@ -67,17 +67,17 @@ test("MiniSQSClient Errors", async (t) => { method: "POST", body: JSON.stringify(message), }).reply(500, "Generic Error"); - await t.rejects(client.sendMessage(message), Error("Generic Error")); + await t.rejects(client.sendMessage(queueARN,message), Error("Generic Error")); }); await t.test("sendMessageBatch Error", async (t) => { const { client } = t.context; - await t.rejects(client.sendMessageBatch({} as any), Error("messages must be an array")); + await t.rejects(client.sendMessageBatch(queueARN,{} as any), Error("messages must be an array")); }); await t.test("deleteMessageBatch Error no array", async (t) => { const { client } = t.context; - await t.rejects(client.deleteMessageBatch(randomUUID() as any), "messages must be an array"); + await t.rejects(client.deleteMessageBatch(queueARN,randomUUID() as any), "messages must be an array"); }); await t.test("deleteMessageBatch Genreric Error", async (t) => { @@ -89,7 +89,7 @@ test("MiniSQSClient Errors", async (t) => { return headers["x-amz-target"] === "AmazonSQS.DeleteMessageBatch"; } }).reply(500, "Generic Error"); - await t.rejects(client.deleteMessageBatch([randomUUID()])); + await t.rejects(client.deleteMessageBatch(queueARN,[randomUUID()])); }); await t.test("receiveMessage Error", async (t) => { @@ -112,13 +112,13 @@ test("MiniSQSClient Errors", async (t) => { } } - await t.rejects(client.receiveMessage({ + await t.rejects(client.receiveMessage(queueARN,{ WaitTimeSeconds: 20 }, MockClientLocal as any)); }); await t.test("changeMessageVisibilityBatch Error no array", async (t) => { const { client } = t.context; - await t.rejects(client.changeMessageVisibilityBatch(randomUUID() as any, 30), "messages must be an array"); + await t.rejects(client.changeMessageVisibilityBatch(queueARN,randomUUID() as any, 30), "messages must be an array"); }); }); \ No newline at end of file diff --git a/test/index.ts b/test/index.ts index f8ab7d7..fc6fa13 100644 --- a/test/index.ts +++ b/test/index.ts @@ -14,7 +14,7 @@ test("MiniSQSClient", { only: true }, async (t) => { setGlobalDispatcher(mockAgent); mockAgent.disableNetConnect(); const mockPool = mockAgent.get("https://sqs.eu-central-1.amazonaws.com"); - const client = new MiniSQSClient(queueARN, undefined, { + const client = new MiniSQSClient("eu-central-1", undefined, { factory: () => mockPool }); t.context = { @@ -49,13 +49,13 @@ test("MiniSQSClient", { only: true }, async (t) => { return headers["x-amz-target"] === "AmazonSQS.SendMessage"; } }).reply(200, mockResponse); - const result = await client.sendMessage(message); + const result = await client.sendMessage(queueARN, message); t.same(result, mockResponse); }); await t.test("sendMessage Using signer instance", async (t) => { const { mockPool } = t.context; - const client = new MiniSQSClient(queueARN, undefined, { + const client = new MiniSQSClient("eu-central-1", undefined, { factory: () => mockPool }, new Signer()); const message: SendMessage = { @@ -73,12 +73,12 @@ test("MiniSQSClient", { only: true }, async (t) => { return headers["x-amz-target"] === "AmazonSQS.SendMessage"; } }).reply(200, mockResponse); - const result = await client.sendMessage(message); + const result = await client.sendMessage(queueARN,message); t.same(result, mockResponse); }); await t.test("sendMessage Using signer options", async (t) => { const { mockPool } = t.context; - const client = new MiniSQSClient(queueARN, undefined, { + const client = new MiniSQSClient("eu-central-1", undefined, { factory: () => mockPool }, {minThreads: 1, maxThreads: 1}); const message: SendMessage = { @@ -96,12 +96,12 @@ test("MiniSQSClient", { only: true }, async (t) => { return headers["x-amz-target"] === "AmazonSQS.SendMessage"; } }).reply(200, mockResponse); - const result = await client.sendMessage(message); + const result = await client.sendMessage(queueARN,message); t.same(result, mockResponse); }); await t.test("sendMessage and destroy client", async (t) => { const { mockPool } = t.context; - const client = new MiniSQSClient(queueARN, undefined, { + const client = new MiniSQSClient("eu-central-1", undefined, { factory: () => mockPool }, new Signer()); const message: SendMessage = { @@ -119,7 +119,7 @@ test("MiniSQSClient", { only: true }, async (t) => { return headers["x-amz-target"] === "AmazonSQS.SendMessage"; } }).reply(200, mockResponse); - const result = await client.sendMessage(message); + const result = await client.sendMessage(queueARN,message); t.same(result, mockResponse); await t.resolves(client.destroy()); }); @@ -169,7 +169,7 @@ test("MiniSQSClient", { only: true }, async (t) => { return headers["x-amz-target"] === "AmazonSQS.SendMessageBatch"; } }).reply(200, mockResponse); - const result = await client.sendMessageBatch(messages); + const result = await client.sendMessageBatch(queueARN,messages); t.same(result, mockResponse); }); @@ -226,7 +226,7 @@ test("MiniSQSClient", { only: true }, async (t) => { Failed: [], Successful: responsesChunks[1] }); - const result = await client.sendMessageBatch(messages); + const result = await client.sendMessageBatch(queueARN,messages); t.same(result, mockResponse); }); @@ -243,7 +243,7 @@ test("MiniSQSClient", { only: true }, async (t) => { return headers["x-amz-target"] === "AmazonSQS.DeleteMessage"; } }).reply(200, {}); - await t.resolves(client.deleteMessage(receiptHandle)); + await t.resolves(client.deleteMessage(queueARN,receiptHandle)); }); await t.test("deleteMessageBatch", async (t) => { @@ -264,7 +264,7 @@ test("MiniSQSClient", { only: true }, async (t) => { return headers["x-amz-target"] === "AmazonSQS.DeleteMessageBatch"; } }).reply(200, {}); - await t.resolves(client.deleteMessageBatch(receiptHandles)); + await t.resolves(client.deleteMessageBatch(queueARN,receiptHandles)); }); await t.test("receiveMessage", async (t) => { @@ -296,7 +296,7 @@ test("MiniSQSClient", { only: true }, async (t) => { } } - const messages = await client.receiveMessage({ + const messages = await client.receiveMessage(queueARN,{ WaitTimeSeconds: 20 }, MockClientLocal as any); t.same(messages, { @@ -336,7 +336,7 @@ test("MiniSQSClient", { only: true }, async (t) => { } } - const messages = await client.receiveMessage({ + const messages = await client.receiveMessage(queueARN,{ WaitTimeSeconds: 50 }, MockClientLocal as any); t.same(messages, { @@ -358,7 +358,7 @@ test("MiniSQSClient", { only: true }, async (t) => { return headers["x-amz-target"] === "AmazonSQS.ChangeMessageVisibility"; } }).reply(200, {}); - await t.resolves(client.changeMessageVisibility(receiptHandle, 30)); + await t.resolves(client.changeMessageVisibility(queueARN,receiptHandle, 30)); }); await t.test("changeMessageVisibilityBatch", async (t) => { @@ -381,6 +381,6 @@ test("MiniSQSClient", { only: true }, async (t) => { return headers["x-amz-target"] === "AmazonSQS.ChangeMessageVisibilityBatch"; } }).reply(200, {}); - await t.resolves(client.changeMessageVisibilityBatch(receiptHandles, 30)); + await t.resolves(client.changeMessageVisibilityBatch(queueARN,receiptHandles, 30)); }); }); \ No newline at end of file