Skip to content

Commit

Permalink
Merge branch 'master' into malik.9063.granular-topic-gating
Browse files Browse the repository at this point in the history
  • Loading branch information
mzparacha committed Oct 17, 2024
2 parents a2009af + 163e9c2 commit 669480e
Show file tree
Hide file tree
Showing 41 changed files with 535 additions and 319 deletions.
2 changes: 1 addition & 1 deletion Dockerfile.datadog
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ RUN apt-get update && apt-get -y install --reinstall datadog-agent
# Expose DogStatsD and trace-agent ports
EXPOSE 8125/udp 8126/tcp

COPY datadog-config/ /etc/datadog-agent/
RUN mkdir -p /var/run/datadog
Empty file removed datadog-config/datadog.yaml
Empty file.
39 changes: 36 additions & 3 deletions libs/adapters/src/trpc/handlers.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import {
CacheNamespaces,
Events,
INVALID_ACTOR_ERROR,
INVALID_INPUT_ERROR,
INVALID_STATE_ERROR,
cache,
command as coreCommand,
query as coreQuery,
handleEvent,
logger,
type EventSchemas,
type EventsHandlerMetadata,
type Metadata,
Expand All @@ -14,6 +17,8 @@ import { TRPCError } from '@trpc/server';
import { ZodSchema, ZodUndefined, z } from 'zod';
import { Commit, Tag, Track, buildproc, procedure } from './middleware';

const log = logger(import.meta);

const trpcerror = (error: unknown): TRPCError => {
if (error instanceof Error) {
const { name, message, ...other } = error;
Expand Down Expand Up @@ -89,6 +94,7 @@ export const command = <
* @param factory query factory
* @param tag query tag used for OpenAPI spec grouping
* @param forceSecure whether to force secure requests for rate-limited external-router
* @param ttlSecs cache response ttl in seconds
* @returns tRPC query procedure
*/
export const query = <
Expand All @@ -98,25 +104,52 @@ export const query = <
>(
factory: () => Metadata<Input, Output, AuthContext>,
tag: Tag,
forceSecure?: boolean,
options?: {
forceSecure?: boolean;
ttlSecs?: number;
},
) => {
const md = factory();
return buildproc({
method: 'GET',
name: factory.name,
md,
tag,
forceSecure,
forceSecure: options?.forceSecure,
}).query(async ({ ctx, input }) => {
try {
return await coreQuery(
const cacheKey = options?.ttlSecs
? `${factory.name}_${JSON.stringify(input)}`
: undefined;
if (cacheKey) {
const cachedReponse = await cache().getKey(
CacheNamespaces.Query_Response,
cacheKey,
);
if (cachedReponse) {
log.info(`Returning cached response for ${cacheKey}`);
return JSON.parse(cachedReponse);
}
}
const response = await coreQuery(
md,
{
actor: ctx.actor,
payload: input!,
},
false,
);
if (cacheKey) {
void cache()
.setKey(
CacheNamespaces.Query_Response,
cacheKey,
JSON.stringify(response),
options?.ttlSecs,
)
.then(() => log.info(`Cached response for ${cacheKey}`));
}
return response;
} catch (error) {
throw trpcerror(error);
}
Expand Down
1 change: 1 addition & 0 deletions libs/core/src/ports/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export enum CacheNamespaces {
Activity_Cache = 'activity_cache',
Rate_Limiter = 'rate_limiter',
Api_key_auth = 'api_key_auth',
Query_Response = 'query_response',
}

/**
Expand Down
5 changes: 2 additions & 3 deletions libs/model/src/feed/GetGlobalActivity.query.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import { Query } from '@hicommonwealth/core';
import * as schemas from '@hicommonwealth/schemas';
import { GlobalActivityCache } from '../globalActivityCache';
import { getUserActivityFeed } from '../getUserActivityFeed';

export function GetGlobalActivity(): Query<typeof schemas.ActivityFeed> {
return {
...schemas.ActivityFeed,
auth: [],
secure: false,
body: async () =>
await GlobalActivityCache.getInstance().getGlobalActivity(),
body: async () => await getUserActivityFeed({}),
};
}
2 changes: 1 addition & 1 deletion libs/model/src/feed/GetUserActivity.query.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Query } from '@hicommonwealth/core';
import * as schemas from '@hicommonwealth/schemas';
import { getUserActivityFeed } from '../globalActivityCache';
import { getUserActivityFeed } from '../getUserActivityFeed';

export function GetUserActivity(): Query<typeof schemas.ActivityFeed> {
return {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { CacheNamespaces, cache, logger } from '@hicommonwealth/core';
import { ActivityFeed, ActivityThread } from '@hicommonwealth/schemas';
import { QueryTypes } from 'sequelize';
import { v4 as uuidv4 } from 'uuid';
import { z } from 'zod';
import { models } from './database';

Expand All @@ -15,9 +13,7 @@ export async function getUserActivityFeed({
user_id = 0,
thread_limit = 50,
comment_limit = 3,
}: Omit<z.infer<typeof ActivityFeed.input>, 'is_global'> & {
user_id?: number;
}) {
}: z.infer<typeof ActivityFeed.input> & { user_id?: number }) {
const query = `
WITH
user_communities AS (
Expand Down Expand Up @@ -114,124 +110,3 @@ ORDER BY

return threads.map((t) => t.thread);
}

const log = logger(import.meta);

export class GlobalActivityCache {
private _cacheKey = 'global_activity';
private _lockName = 'global_activity_cache_locker';
private static _instance: GlobalActivityCache;

constructor(
private _cacheTTL: number = 60 * 5, // cache TTL in seconds
) {}

static getInstance(cacheTTL?: number): GlobalActivityCache {
if (!GlobalActivityCache._instance) {
GlobalActivityCache._instance = new GlobalActivityCache(cacheTTL);
}
return GlobalActivityCache._instance;
}

public async start() {
await this.refreshGlobalActivity();
// eslint-disable-next-line @typescript-eslint/no-misused-promises
setInterval(this.refreshGlobalActivity.bind(this), this._cacheTTL * 1000);
}

public async getGlobalActivity() {
const activity = await cache().getKey(
CacheNamespaces.Activity_Cache,
this._cacheKey,
);
if (!activity) {
if (GlobalActivityCache._instance) {
const msg = 'Failed to fetch global activity from Redis';
log.error(msg);
}
return await getUserActivityFeed({});
}
return JSON.parse(activity);
}

public async deleteActivityFromCache(threadId: number): Promise<void> {
const errorMsg = 'Failed to update global activity in Redis';

try {
const res = await cache().getKey(
CacheNamespaces.Activity_Cache,
this._cacheKey,
);

if (!res) {
log.info('Global Activity Cache is empty');
return;
}

let activity = JSON.parse(res);
let updated = false;
activity = activity.filter((a: any) => {
let shouldKeep = true;
if (a.thread_id === threadId) {
updated = true;
shouldKeep = false;
}
return shouldKeep;
});

if (!updated) return;

const result = await cache().setKey(
CacheNamespaces.Activity_Cache,
this._cacheKey,
JSON.stringify(activity),
);
if (!result) {
log.error(errorMsg);
}
} catch (e: any) {
log.error(errorMsg, e);
}
}

private async refreshGlobalActivity(): Promise<void> {
try {
const lockAcquired = await this.acquireLock();

if (lockAcquired === false) {
log.info('Unable to acquire lock. Skipping refresh...');
return;
}

const activity = await getUserActivityFeed({});
const result = await cache().setKey(
CacheNamespaces.Activity_Cache,
this._cacheKey,
JSON.stringify(activity),
);

if (!result) {
const msg = 'Failed to save global activity in Redis';
log.error(msg);
return;
}

log.info('Activity cache successfully refreshed');
} catch (e: any) {
const msg = 'Failed to refresh the global cache';
log.error(msg, e);
}
}

private async acquireLock() {
return await cache().setKey(
CacheNamespaces.Activity_Cache,
this._lockName,
uuidv4(),
// shorten by 5 seconds to eliminate any discrepancies
// between setInterval delay and Redis TTL
this._cacheTTL - 5,
true,
);
}
}
1 change: 0 additions & 1 deletion libs/model/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,5 @@ export type { E2E_TestEntities } from './tester';
export * from './chainEventSignatures';
export * from './config';
export * from './database';
export * from './globalActivityCache';
export * from './models';
export * from './utils';
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ const GLOBAL_ACTIVITY_STALE_TIME = 5 * 60 * 1_000; // 5 minutes (backend caches

export const useFetchGlobalActivityQuery = () => {
return trpc.feed.getGlobalActivity.useQuery(
{
thread_limit: 50,
comment_limit: 3,
},
{},
{
staleTime: GLOBAL_ACTIVITY_STALE_TIME,
cacheTime: USER_ACTIVITY_CACHE_TIME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { fetchCachedNodes } from 'state/api/nodes';

interface UseTokenMetadataQueryProps {
tokenId: string;
chainId: number;
nodeEthChainId: number;
apiEnabled?: boolean;
}

Expand All @@ -17,15 +17,17 @@ export type GetTokenMetadataResponse = {

const getTokenMetadata = async ({
tokenId,
chainId,
nodeEthChainId,
}: UseTokenMetadataQueryProps): Promise<GetTokenMetadataResponse> => {
const ethereumNode = fetchCachedNodes()?.find((n) => n?.id === chainId);
const node = fetchCachedNodes()?.find(
(n) => n?.ethChainId === nodeEthChainId,
);

if (!ethereumNode) {
throw new Error('Ethereum node not found');
if (!node) {
throw new Error('Node not found');
}

const response = await axios.post(ethereumNode.url, {
const response = await axios.post(node.url, {
params: [tokenId],
method: 'alchemy_getTokenMetadata',
});
Expand All @@ -35,12 +37,12 @@ const getTokenMetadata = async ({

const useTokenMetadataQuery = ({
tokenId,
chainId,
nodeEthChainId,
apiEnabled = true,
}: UseTokenMetadataQueryProps) => {
return useQuery({
queryKey: [tokenId, chainId],
queryFn: () => getTokenMetadata({ tokenId, chainId }),
queryKey: [tokenId, nodeEthChainId],
queryFn: () => getTokenMetadata({ tokenId, nodeEthChainId }),
enabled: !!tokenId && apiEnabled,
retry: false,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ import { useTokenMetadataQuery } from 'state/api/tokens';
import { useDebounce } from 'usehooks-ts';

type UseTokenFinderProps = {
chainId: number;
nodeEthChainId: number;
};

const useTokenFinder = ({ chainId }: UseTokenFinderProps) => {
const useTokenFinder = ({ nodeEthChainId }: UseTokenFinderProps) => {
const [tokenValue, setTokenValue] = useState('');
const debouncedTokenValue = useDebounce<string>(tokenValue, 500);

const { data: tokenMetadata, isLoading: tokenMetadataLoading } =
useTokenMetadataQuery({
tokenId: debouncedTokenValue,
chainId,
nodeEthChainId,
});

const getTokenError = () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const useFundContestForm = ({
}: UseFundContestFormProps) => {
const [tokenAmount, setTokenAmount] = useState(INITIAL_AMOUNT);
const { data: tokenMetadata } = useTokenMetadataQuery({
chainId: chainNodeId,
nodeEthChainId: ethChainId,
tokenId: fundingTokenAddress || '',
});
const { data: tokenUsdRateData } = useFetchTokenUsdRateQuery({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ const ManageContest = ({ contestAddress }: ManageContestProps) => {
contestAddress,
});

const chainId = app.chain.meta.ChainNode?.id || 0;
const nodeEthChainId = app.chain.meta.ChainNode?.eth_chain_id || 0;
const { data: tokenMetadata } = useTokenMetadataQuery({
tokenId: contestFormData?.fundingTokenAddress || '',
chainId,
nodeEthChainId,
apiEnabled: !!contestFormData?.fundingTokenAddress,
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ const DetailsFormStep = ({

const { mutateAsync: updateContest } = useUpdateContestMutation();

const chainId = app.chain.meta.ChainNode?.id || 0;
const {
tokenValue,
setTokenValue,
Expand All @@ -106,7 +105,7 @@ const DetailsFormStep = ({
tokenMetadata,
tokenMetadataLoading,
} = useTokenFinder({
chainId: chainId,
nodeEthChainId: app.chain.meta.ChainNode?.eth_chain_id || 0,
});

const communityId = app.activeChainId() || '';
Expand Down
Loading

0 comments on commit 669480e

Please sign in to comment.