Skip to content

Instantly share code, notes, and snippets.

@TotooriaHyperion
Last active February 20, 2021 09:19
Show Gist options
  • Save TotooriaHyperion/0e758558f54ad543047b8db8fbd25921 to your computer and use it in GitHub Desktop.
Save TotooriaHyperion/0e758558f54ad543047b8db8fbd25921 to your computer and use it in GitHub Desktop.
react - rxjs & reactive
import ReactDOM from "react-dom";
import { Observable } from "rxjs";
// wrap this on every [source] of observable(eg: from(), subject, fromEvent(), fromFetch())
// to let all subscriber to run in unstable_batchedUpdates
export const reactBatch = <T>(obs: Observable<T>): Observable<T> => {
return new Observable<T>((observer) => {
// 如果是测试环境(对model进行单元测试)
// 则需要把 unstable_batchedUpdates 替换成直接执行
return obs.subscribe({
next: (v) => ReactDOM.unstable_batchedUpdates(() => observer.next(v)),
error: (err) => observer.error(err),
complete: () => observer.complete(),
});
});
};
import { useDebugValue, useMemo } from "react";
import { Observable, Subscription as RxSubscription } from "rxjs";
import { Subscription, useSubscription } from "use-subscription";
export function useObservables<T extends [...Observable<any>[]]>(
...obs: T
): {
[key in keyof T]: T[key] extends Observable<infer V> ? V | undefined : never;
} {
const subscription = useMemo<Subscription<any>>(() => {
let dirty = false;
let value: any = [];
// get initial value if possible
obs.forEach((ob, idx) => {
ob.subscribe((v) => {
value[idx] = v;
}).unsubscribe();
});
return {
subscribe: (cb) => {
const sub = new RxSubscription();
obs.forEach((ob, idx) => {
sub.add(
ob.subscribe((v) => {
if (v !== value[idx]) {
value[idx] = v;
// mark dirty
dirty = true;
// trigger update
cb();
}
}),
);
});
return () => sub.unsubscribe();
},
getCurrentValue: () => {
if (dirty) {
// for immutability
value = [...value];
dirty = false;
}
return value;
},
};
}, obs);
const result = useSubscription(subscription);
useDebugValue(result);
return result;
}
import { useDebugValue, useMemo } from "react";
import { Observable, Subscription as RxSubscription } from "rxjs";
import { Subscription, useSubscription } from "use-subscription";
export function useObservables<T extends [...Observable<any>[]]>(
...obs: T
): {
[key in keyof T]: T[key] extends Observable<infer V> ? V | undefined : never;
} {
const subscription = useMemo<Subscription<any>>(() => {
let dirty = false;
let value: any = [];
let result: any = [];
let update: (() => void) | null = null;
let sub: RxSubscription | null = null;
let activated = false;
const bootstrap = () => {
// 执行订阅
if (!sub) {
sub = new RxSubscription();
obs.forEach((ob, idx) => {
sub!.add(
ob.subscribe((v) => {
if (v !== value[idx]) {
value[idx] = v;
// mark dirty
dirty = true;
// trigger update
update?.();
}
}),
);
});
}
};
const tryActivate = () => {
// 激活订阅
// 为什么?因为我们不希望在 getCurrentValue 里 subscribe().unsubscribe()
// 因为他们会影响 rxjs observables 的 connect 行为
// 而 use-subscription 在订阅之前就会执行 getCurrentValue 来获取当前值
// 因此 getCurrentValue 也需要触发订阅,但在整个过程中,只应该进行1次订阅和1次解除订阅
// 因此提取出 tryActivate 来确保 getCurrentValue 和 subscribe 都会激活,但订阅只执行一次
if (!activated) {
activated = true;
bootstrap();
}
};
const cleanUp = () => {
// 解除订阅
sub?.unsubscribe();
sub = null;
update = null;
activated = false;
};
return {
subscribe: (cb) => {
tryActivate();
update = cb;
return cleanUp;
},
getCurrentValue: () => {
tryActivate();
if (dirty) {
// for immutability
result = [...value];
dirty = false;
}
return result;
},
};
}, obs);
const result = useSubscription(subscription);
useDebugValue(result);
return result;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment