Skip to content

Commit

Permalink
adds ability to include or skip specific topics subscribed to
Browse files Browse the repository at this point in the history
  • Loading branch information
Suraj Keshri committed Jul 19, 2024
1 parent f84c9ac commit e3a38d9
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 9 deletions.
7 changes: 7 additions & 0 deletions .changeset/six-waves-hug.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"graphql-eventbus": minor
"graphql-eventbus-google-pubsub": minor
"graphql-eventbus-rabbitmq": minor
---

adds ability to include or skip specific topics subscribed to
3 changes: 3 additions & 0 deletions packages/core/src/GraphQLEventbus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ export class GraphQLEventbus {
payload: {};
metadata?: Partial<GraphQLEventbusMetadata>;
}) => {
if (!this.isInitialized) {
throw new Error("The eventbus must be initialized before publishing.");
}
if (!this.publishValidator) {
throw new Error("Publish config not added!");
}
Expand Down
106 changes: 106 additions & 0 deletions packages/core/src/MemoryEventBus.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,110 @@ describe("MemoryEventBus", () => {
await wait(0);
expect(consumeCb).not.toBeCalled();
});
test("only included topics are consumed", async () => {
const consumeCb = jest.fn();
const queries = gql`
query Query1 {
SignUpEvent {
id
}
}
query Query2 {
EntityFlagEvent {
id
groupId
}
}
`;
const subscribeConfig: SubscriberConfig = {
cb: consumeCb,
queries,
schema: publisherSchema,
};
const bus = new MemoryEventBus({
schema: publisherSchema,
subscriber: {
...subscribeConfig,
includeTopics: ["SignUpEvent"],
},
plugins: [LoggingPlugin()],
});
await bus.init();
await bus.publish({
topic: "SignUpEvent",
payload: validator.sample("SignUpEvent").data["SignUpEvent"],
metadata: { test: "data" },
});
await wait(0);
expect(consumeCb).toBeCalledTimes(1);
expect(consumeCb.mock.calls[0][0]).toMatchObject({
payload: {
id: expect.any(String),
},
topic: "SignUpEvent",
// metadata is propagated
metadata: { test: "data" },
});
consumeCb.mockClear();
await bus.publish({
topic: "EntityFlagEvent",
payload: validator.sample("EntityFlagEvent").data["EntityFlagEvent"],
metadata: { "x-request-id": "123" },
});
await wait(0);
expect(consumeCb).not.toBeCalled();
});
test("skipped topics are not consumed", async () => {
const consumeCb = jest.fn();
const queries = gql`
query Query1 {
SignUpEvent {
id
}
}
query Query2 {
EntityFlagEvent {
id
groupId
}
}
`;
const subscribeConfig: SubscriberConfig = {
cb: consumeCb,
queries,
schema: publisherSchema,
};
const bus = new MemoryEventBus({
schema: publisherSchema,
subscriber: {
...subscribeConfig,
skipTopics: ["EntityFlagEvent"],
},
plugins: [LoggingPlugin()],
});
await bus.init();
await bus.publish({
topic: "SignUpEvent",
payload: validator.sample("SignUpEvent").data["SignUpEvent"],
metadata: { test: "data" },
});
await wait(0);
expect(consumeCb).toBeCalledTimes(1);
expect(consumeCb.mock.calls[0][0]).toMatchObject({
payload: {
id: expect.any(String),
},
topic: "SignUpEvent",
// metadata is propagated
metadata: { test: "data" },
});
consumeCb.mockClear();
await bus.publish({
topic: "EntityFlagEvent",
payload: validator.sample("EntityFlagEvent").data["EntityFlagEvent"],
metadata: { "x-request-id": "123" },
});
await wait(0);
expect(consumeCb).not.toBeCalled();
});
});
18 changes: 16 additions & 2 deletions packages/core/src/MemoryEventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ export type MemoryEventBusConfig = {
subscriber?: {
cb: EventBusSubscriberCb;
queries: DocumentNode;
allowConsumingNonExistingTopic?: boolean;
skipTopics?: string[];
includeTopics?: string[];
};
plugins?: EventBusPlugin[];
allowPublishNonExistingTopic?: boolean;
Expand All @@ -38,8 +41,19 @@ export class MemoryEventBus {
subscriber: this.config.subscriber
? {
cb: this.config.subscriber.cb,
subscribe: (topics, cb: DataCb) => {
topics.forEach((topic) => {
subscribe: (allTopics, cb: DataCb) => {
let finalTopics = allTopics;
if (this.config.subscriber?.includeTopics?.length) {
finalTopics = allTopics.filter((t) =>
this.config.subscriber?.includeTopics?.includes(t),
);
}
if (this.config.subscriber?.skipTopics?.length) {
finalTopics = allTopics.filter(
(t) => !this.config.subscriber?.skipTopics?.includes(t),
);
}
finalTopics.forEach((topic) => {
this.eventEmitter.on(`message-${topic}`, async (baggage) => {
await cb({
baggage: JSON.parse(baggage),
Expand Down
17 changes: 15 additions & 2 deletions packages/google-pubsub/src/PubSubEventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ export type PubSubEventBusConfig = {
schema: GraphQLSchema;
cb: EventBusSubscriberCb;
options?: SubscriptionOptions;
skipTopics?: string[];
includeTopics?: string[];
};
plugins?: EventBusPlugin[];
serviceName: string;
Expand Down Expand Up @@ -61,8 +63,19 @@ export class PubSubEventBus {
subscriber: this.config.subscriber
? {
cb: this.config.subscriber?.cb,
subscribe: async (topics, cb) => {
await topics.reduce(async (acc, topicName) => {
subscribe: async (allTopics, cb) => {
let finalTopics = allTopics;
if (this.config.subscriber?.includeTopics?.length) {
finalTopics = allTopics.filter((t) =>
this.config.subscriber?.includeTopics?.includes(t),
);
}
if (this.config.subscriber?.skipTopics?.length) {
finalTopics = allTopics.filter(
(t) => !this.config.subscriber?.skipTopics?.includes(t),
);
}
await finalTopics.reduce(async (acc, topicName) => {
const [topic] = await this.pubsubClient
.topic(topicName)
.get({ autoCreate: true });
Expand Down
23 changes: 18 additions & 5 deletions packages/rabbitmq/src/RabbitMQEventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export type RabbitMQEventBusConfig = {
schema: GraphQLSchema;
queries: DocumentNode;
cb: EventBusSubscriberCb;
skipTopics?: string[];
includeTopics?: string[];
};
serviceName: string;
plugins?: EventBusPlugin[];
Expand All @@ -35,7 +37,7 @@ export class RabbitMQEventBus {
publisher: config.publisher
? {
schema: config.publisher?.schema,
publish: async args => {
publish: async (args) => {
this.publishChannel?.publish(
EXCHANGE,
args.topic,
Expand All @@ -54,16 +56,27 @@ export class RabbitMQEventBus {
schema: config.subscriber.schema,
queries: config.subscriber.queries,
cb: config.subscriber.cb,
subscribe: (topics, dataCb) => {
subscribe: (allTopics, dataCb) => {
let finalTopics = allTopics;
if (this.config.subscriber?.includeTopics?.length) {
finalTopics = allTopics.filter((t) =>
this.config.subscriber?.includeTopics?.includes(t),
);
}
if (this.config.subscriber?.skipTopics?.length) {
finalTopics = allTopics.filter(
(t) => !this.config.subscriber?.skipTopics?.includes(t),
);
}
this.consumeChannel
?.assertQueue(queueName, {
exclusive: false,
})
.then(() => {
topics.forEach(topic => {
finalTopics.forEach((topic) => {
this.consumeChannel?.bindQueue(queueName, EXCHANGE, topic);
});
this.consumeChannel?.consume(queueName, msg => {
this.consumeChannel?.consume(queueName, (msg) => {
if (msg?.content) {
dataCb({
baggage: JSON.parse(msg.content.toString("utf-8")),
Expand All @@ -72,7 +85,7 @@ export class RabbitMQEventBus {
.then(() => {
this.consumeChannel?.ack(msg);
})
.catch(e => {
.catch((e) => {
this.consumeChannel?.nack(msg);
throw e;
});
Expand Down

0 comments on commit e3a38d9

Please sign in to comment.