Skip to content

Commit

Permalink
Merge pull request #1 from fgiova/feat/redesign-multiqueue
Browse files Browse the repository at this point in the history
feat!: Support multiple queues whit one client for region/account
  • Loading branch information
fgiova committed Nov 15, 2023
2 parents c5644bd + 845b932 commit ae9c61c
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 66 deletions.
26 changes: 13 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ npm install @fgiova/mini-sqs-client
import {MiniSQSClient} from '@fgiova/mini-sqs-client'
import console = require("console");

const client = new MiniSQSClient("arn:aws:sqs:eu-central-1:000000000000:test");
const client = new MiniSQSClient("eu-central-1");

await client.sendMessage({
await client.sendMessage("arn:aws:sqs:eu-central-1:000000000000:test", {
MessageBody: "Hello world",
MessageAttributes: {
"my-attribute": {
Expand All @@ -36,35 +36,35 @@ await client.sendMessage({
}
});

const messages = await client.receiveMessage({
const messages = await client.receiveMessage("arn:aws:sqs:eu-central-1:000000000000:test", {
WaitTimeSeconds: 20,
MaxNumberOfMessages: 1,
MessageAttributeNames: ["my-attribute"]
});

const message = messages[0];

await client.changeMessageVisibility({
await client.changeMessageVisibility("arn:aws:sqs:eu-central-1:000000000000:test", {
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: 10
});

console.log(message.Body);

await client.deleteMessage(message.ReceiptHandle);
await client.deleteMessage("arn:aws:sqs:eu-central-1:000000000000:test", message.ReceiptHandle);
```

## API

```typescript
MiniSQSClient(queueARN: string, endpoint?: string, undiciOptions?: Pool.Options, signer?: Signer | SignerOptions)
MiniSQSClient.sendMessage(message: SendMessage): Promise<SendMessageResult>
MiniSQSClient.sendMessageBatch(messages: SendMessage[]): Promise<SendMessageBatchResult>
MiniSQSClient.receiveMessage(options: ReceiveMessage): Promise<ReceiveMessageResult>
MiniSQSClient.deleteMessage(receiptHandle: string): Promise<boolean>
MiniSQSClient.deleteMessageBatch(receiptHandles: string[]): Promise<boolean>
MiniSQSClient.changeMessageVisibility(receiptHandle: string, visibilityTimeout: number): Promise<boolean>
MiniSQSClient.changeMessageVisibilityBatch(receiptHandles: string[], visibilityTimeout: number): Promise<boolean>
MiniSQSClient(region: string, endpoint?: string, undiciOptions?: Pool.Options, signer?: Signer | SignerOptions)
MiniSQSClient.sendMessage(queueARN: string, message: SendMessage): Promise<SendMessageResult>
MiniSQSClient.sendMessageBatch(queueARN: string, messages: SendMessage[]): Promise<SendMessageBatchResult>
MiniSQSClient.receiveMessage(queueARN: string, options: ReceiveMessage): Promise<ReceiveMessageResult>
MiniSQSClient.deleteMessage(queueARN: string, receiptHandle: string): Promise<boolean>
MiniSQSClient.deleteMessageBatch(queueARN: string, receiptHandles: string[]): Promise<boolean>
MiniSQSClient.changeMessageVisibility(queueARN: string, receiptHandle: string, visibilityTimeout: number): Promise<boolean>
MiniSQSClient.changeMessageVisibilityBatch(queueARN: string, receiptHandles: string[], visibilityTimeout: number): Promise<boolean>
```

All types are defined in [schemas.ts](./src/schemas.ts) and are derived from the [AWS SQS API](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_Operations.html) <br />
Expand Down
66 changes: 38 additions & 28 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,17 @@ import type {
} from "./schemas";

export class MiniSQSClient {
private readonly queueSettings: {
region: string,
accountId: string,
queueName: string,
host: string,
endpoint: string
};
private readonly pool: Pool;
private readonly undiciOptions: Pool.Options;
private readonly signer: Signer;
constructor (queueARN: string, endpoint?: string, undiciOptions?: Pool.Options, signer?: Signer | SignerOptions) {
private region: string;
private endpoint: string;

constructor (region: string, endpoint?: string, undiciOptions?: Pool.Options, signer?: Signer | SignerOptions) {
this.undiciOptions = undiciOptions;
this.queueSettings = this.getQueueARN(queueARN, endpoint);
this.pool = new Pool(this.queueSettings.endpoint, undiciOptions);
this.region = region;
this.endpoint = endpoint ?? `https://sqs.${region}.amazonaws.com`;
this.pool = new Pool(this.endpoint, undiciOptions);
if (signer instanceof Signer) {
this.signer = signer;
}
Expand All @@ -42,9 +39,10 @@ export class MiniSQSClient {
]);
}

private getQueueARN (queueARN: string, endpoint?: string) {
private getQueueARN (queueARN: string) {
const [queueName, accountId, region] = queueARN.split(":").reverse();
endpoint = endpoint ?? `https://sqs.${region}.amazonaws.com`;
if (region !== this.region) throw new Error(`Region ${region} does not match ${this.region}`);
const endpoint = this.endpoint;
const url = new URL(endpoint);
return {
region,
Expand All @@ -54,8 +52,14 @@ export class MiniSQSClient {
endpoint
}
}
private async SQSRequest<B,R>(body: B, target: SQSTarget, JSONResponse= true) {
const {region, accountId, queueName, host} = this.queueSettings;
private async SQSRequest<B,R>(body: B, target: SQSTarget, queueSettings: {
region: string,
accountId: string,
queueName: string,
host: string,
endpoint: string
}, JSONResponse= true) {
const {region, accountId, queueName, host} = queueSettings;
const requestBody = JSON.stringify({
...body
});
Expand Down Expand Up @@ -108,20 +112,22 @@ export class MiniSQSClient {
}, []);
}

async sendMessage (message: SendMessage) {
return this.SQSRequest<SendMessage, SendMessageResult>(message, "SendMessage");
async sendMessage (queueARN: string, message: SendMessage) {
const queueSettings = this.getQueueARN(queueARN);
return this.SQSRequest<SendMessage, SendMessageResult>(message, "SendMessage", queueSettings);
}

async sendMessageBatch (messages: SendMessageBatchItem[]) {
async sendMessageBatch (queueARN: string, messages: SendMessageBatchItem[]) {
if(!Array.isArray(messages)){
throw new Error("messages must be an array");
}
const queueSettings = this.getQueueARN(queueARN);
const messagesChunks = this.splitArrayMessages(messages);
const responses = {} as SendMessageBatchResult;
for(const messagesChunk of messagesChunks){
const responseChunk = await this.SQSRequest<{Entries:SendMessageBatchItem[]}, SendMessageBatchResult>({
Entries: messagesChunk
}, "SendMessageBatch");
}, "SendMessageBatch", queueSettings);
if(responseChunk.Failed ) {
if(!responses.Failed) responses.Failed = [];
responses.Failed.push(...responseChunk.Failed);
Expand All @@ -134,17 +140,19 @@ export class MiniSQSClient {
return responses;
}

async deleteMessage (receiptHandle: string) {
async deleteMessage (queueARN: string, receiptHandle: string) {
const queueSettings = this.getQueueARN(queueARN);
await this.SQSRequest<{ReceiptHandle:string}, boolean>({
ReceiptHandle: receiptHandle
}, "DeleteMessage", false);
}, "DeleteMessage", queueSettings, false);
return true;
}

async deleteMessageBatch (receiptHandles: string[]) {
async deleteMessageBatch (queueARN: string, receiptHandles: string[]) {
if(!Array.isArray(receiptHandles)){
throw new Error("receiptHandles must be an array");
}
const queueSettings = this.getQueueARN(queueARN);
const receiptHandlesData = receiptHandles.map((receiptHandle) => ({
Id: randomUUID(),
ReceiptHandle: receiptHandle
Expand All @@ -154,16 +162,16 @@ export class MiniSQSClient {
Entries: { Id: UUID, ReceiptHandle: string }[]
}, boolean>({
Entries: receiptHandlesData
}, "DeleteMessageBatch", false);
}, "DeleteMessageBatch", queueSettings,false);
}
catch (e) {
throw new Error(`Error ${e.message}\n Deleting messages: ${JSON.stringify(receiptHandlesData)}`);
}
return true;
}

async receiveMessage (receiveMessage: ReceiveMessage, clientClass = Client) {
const {region, accountId, queueName, host, endpoint} = this.queueSettings;
async receiveMessage (queueARN: string, receiveMessage: ReceiveMessage, clientClass = Client) {
const {region, accountId, queueName, host, endpoint} = this.getQueueARN(queueARN);
const receiveBody = JSON.stringify({
...receiveMessage,
WaitTimeSeconds: receiveMessage.WaitTimeSeconds > 20 ? 20 : receiveMessage.WaitTimeSeconds
Expand Down Expand Up @@ -209,25 +217,27 @@ export class MiniSQSClient {
return responseData;
}

async changeMessageVisibility (receiptHandle: string, visibilityTimeout: number) {
async changeMessageVisibility (queueARN: string, receiptHandle: string, visibilityTimeout: number) {
const queueSettings = this.getQueueARN(queueARN);
await this.SQSRequest<{ReceiptHandle:string, VisibilityTimeout: number}, boolean>({
ReceiptHandle: receiptHandle,
VisibilityTimeout: visibilityTimeout
}, "ChangeMessageVisibility", false);
}, "ChangeMessageVisibility", queueSettings,false);
return true;
}

async changeMessageVisibilityBatch (receiptHandles: string[], visibilityTimeout: number) {
async changeMessageVisibilityBatch (queueARN: string, receiptHandles: string[], visibilityTimeout: number) {
if(!Array.isArray(receiptHandles)){
throw new Error("receiptHandles must be an array");
}
const queueSettings = this.getQueueARN(queueARN);
await this.SQSRequest<{
Entries: { Id: UUID, ReceiptHandle: string, VisibilityTimeout: number }[]
}, boolean>({Entries: receiptHandles.map((receiptHandle) => ({
Id: randomUUID(),
ReceiptHandle: receiptHandle,
VisibilityTimeout: visibilityTimeout
}))}, "ChangeMessageVisibilityBatch", false);
}))}, "ChangeMessageVisibilityBatch", queueSettings, false);
return true;
}
}
Expand Down
18 changes: 9 additions & 9 deletions test/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ test("MiniSQSClient Errors", async (t) => {
const mockAgent = new MockAgent();
mockAgent.disableNetConnect();
const mockPool = mockAgent.get("https://sqs.eu-central-1.amazonaws.com");
const client = new MiniSQSClient(queueARN, undefined, {
const client = new MiniSQSClient("eu-central-1", undefined, {
factory: () => mockPool
});
t.context = {
Expand All @@ -39,7 +39,7 @@ test("MiniSQSClient Errors", async (t) => {
}).reply(400, {
message: "The request was rejected because the specified queue does not exist or you do not have access to it.",
});
await t.rejects(client.sendMessage(message), Error("The request was rejected because the specified queue does not exist or you do not have access to it."));
await t.rejects(client.sendMessage(queueARN,message), Error("The request was rejected because the specified queue does not exist or you do not have access to it."));
});
await t.test("sendMessage Error without json message", async (t) => {
const { mockPool, client } = t.context;
Expand All @@ -53,7 +53,7 @@ test("MiniSQSClient Errors", async (t) => {
}).reply(500, {
error: "Generic Error",
});
await t.rejects(client.sendMessage(message), Error(JSON.stringify({
await t.rejects(client.sendMessage(queueARN,message), Error(JSON.stringify({
error: "Generic Error",
})));
});
Expand All @@ -67,17 +67,17 @@ test("MiniSQSClient Errors", async (t) => {
method: "POST",
body: JSON.stringify(message),
}).reply(500, "Generic Error");
await t.rejects(client.sendMessage(message), Error("Generic Error"));
await t.rejects(client.sendMessage(queueARN,message), Error("Generic Error"));
});

await t.test("sendMessageBatch Error", async (t) => {
const { client } = t.context;
await t.rejects(client.sendMessageBatch({} as any), Error("messages must be an array"));
await t.rejects(client.sendMessageBatch(queueARN,{} as any), Error("messages must be an array"));
});

await t.test("deleteMessageBatch Error no array", async (t) => {
const { client } = t.context;
await t.rejects(client.deleteMessageBatch(randomUUID() as any), "messages must be an array");
await t.rejects(client.deleteMessageBatch(queueARN,randomUUID() as any), "messages must be an array");
});

await t.test("deleteMessageBatch Genreric Error", async (t) => {
Expand All @@ -89,7 +89,7 @@ test("MiniSQSClient Errors", async (t) => {
return headers["x-amz-target"] === "AmazonSQS.DeleteMessageBatch";
}
}).reply(500, "Generic Error");
await t.rejects(client.deleteMessageBatch([randomUUID()]));
await t.rejects(client.deleteMessageBatch(queueARN,[randomUUID()]));
});

await t.test("receiveMessage Error", async (t) => {
Expand All @@ -112,13 +112,13 @@ test("MiniSQSClient Errors", async (t) => {
}
}

await t.rejects(client.receiveMessage({
await t.rejects(client.receiveMessage(queueARN,{
WaitTimeSeconds: 20
}, MockClientLocal as any));
});

await t.test("changeMessageVisibilityBatch Error no array", async (t) => {
const { client } = t.context;
await t.rejects(client.changeMessageVisibilityBatch(randomUUID() as any, 30), "messages must be an array");
await t.rejects(client.changeMessageVisibilityBatch(queueARN,randomUUID() as any, 30), "messages must be an array");
});
});
Loading

0 comments on commit ae9c61c

Please sign in to comment.