Skip to content

Commit

Permalink
fix: some connection issues
Browse files Browse the repository at this point in the history
  • Loading branch information
schummar committed Oct 7, 2024
1 parent bda177b commit 9754106
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 74 deletions.
6 changes: 3 additions & 3 deletions src/core/commonTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ export interface UpdateFunction<Value> {
(update: Update<Value>): void;
}

export interface AsyncUpdateFunction<Value, From extends any[] = [Value]> {
(update: UpdateFrom<MaybePromise<Value>, From>): void;
export interface AsyncUpdateFunction<Value> {
(update: UpdateFrom<MaybePromise<Value>, [Value]>): void;
}

export interface Use {
Expand All @@ -75,7 +75,7 @@ export interface BaseConnectionActions<T> {
}

export interface AsyncConnectionActions<T> extends BaseConnectionActions<T> {
updateValue: AsyncUpdateFunction<T, [T | undefined]>;
updateValue: AsyncUpdateFunction<T>;
updateError: (error: unknown) => void;
updateIsConnected: (isConnected: boolean) => void;
close: () => void;
Expand Down
97 changes: 60 additions & 37 deletions src/lib/calculatedValue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,59 +90,76 @@ export function calculatedValue<T>(store: Store<T>, notify: () => void): Calcula

const actions: AsyncConnectionActions<any> = {
set(_value) {
if (connection?.active) {
q(() => {
value = _value;
notify();
});
}
q(() => {
if (!connection?.active) {
return;
}

value = _value;
notify();
});
},
updateValue(update) {
if (connection?.active) {
q(async () => {
if (update instanceof Function) {
const currentValue = await (value as Promise<T>).catch(() => undefined);

if (!connection?.active) {
return;
}

try {
update = update(currentValue);
} catch (error) {
value = PromiseWithState.reject(error) as T;
notify();
return;
}
}
q(async () => {
if (!connection?.active) {
return;
}

if (update instanceof Function) {
const currentValue = await value;

if (!connection?.active) {
return;
}

value = PromiseWithState.resolve(update) as T;
notify();
});
}
try {
update = update(currentValue);
} catch (error) {
value = PromiseWithState.reject(error) as T;
notify();
connection.active = false;
connection.cancel?.();
return;
}
}

value = PromiseWithState.resolve(update) as T;
notify();
});
},
updateError(error) {
if (connection?.active) {
q(() => {
value = PromiseWithState.reject(error) as T;
notify();
});
}
q(() => {
if (!connection?.active) {
return;
}

connection.active = false;
connection.cancel?.();

if ('state' in store) {
(store as unknown as Cache<any>).state.set({
status: 'error',
error,
isConnected: false,
isUpdating: false,
isStale: false,
});
}

value = PromiseWithState.reject(error) as T;
notify();
});
},
updateIsConnected(isConnected) {
if (!connection?.active) {
return;
}

if (isConnected) {
whenConnected.resolve();
}

q(() => {
if (!connection?.active) {
return;
}

if ('state' in store) {
(store as unknown as Cache<any>).state.set('isConnected', isConnected);
}
Expand Down Expand Up @@ -175,6 +192,12 @@ export function calculatedValue<T>(store: Store<T>, notify: () => void): Calcula
value = store.getter({ signal: ac.signal, use, connect });
} catch (error) {
value = PromiseWithState.reject(error) as T;

if (connection) {
connection.active = false;
connection.cancel?.();
q.clear();
}
}
} else {
value = store.getter;
Expand Down
36 changes: 2 additions & 34 deletions test/core/cacheWithConnection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ describe('cache with connection', () => {
ws.addEventListener('open', () => updateIsConnected(true));
ws.addEventListener('close', close);
ws.addEventListener('message', (event) =>
updateValue((x = 0) => {
updateValue((x) => {
return x + Number(event.data);
}),
);
Expand Down Expand Up @@ -252,40 +252,8 @@ describe('cache with connection', () => {
status: 'error',
error: new Error('Connection error'),
isUpdating: false,
isConnected: true,
isConnected: false,
},
]);
});

test('reset error by setting new value', async () => {
const cache = createCache<number>(() => async ({ connect }) => {
await connect(({ updateValue, updateError, updateIsConnected }) => {
updateIsConnected(true);
updateError(new Error('Connection error'));
updateValue((x) => x ?? 1);
return () => {};
});

return 0;
});

const subscriber = vi.fn();
cache.subscribe(() => undefined);
cache.state.subscribe(subscriber);

await flushPromises();

expect(subscriber.mock.calls.map((x) => x[0])).toMatchObject([
{ status: 'pending', isStale: true, isUpdating: true, isConnected: false },
{ status: 'value', value: 0, isStale: false, isUpdating: false, isConnected: false },
{ status: 'value', value: 0, isStale: false, isUpdating: false, isConnected: true },
{
status: 'error',
error: new Error('Connection error'),
isUpdating: false,
isConnected: true,
},
{ status: 'value', value: 1, isStale: false, isUpdating: false, isConnected: true },
]);
});
});

0 comments on commit 9754106

Please sign in to comment.