|
import { |
|
EventStoreDBClient, |
|
START, |
|
FORWARDS, |
|
BACKWARDS, |
|
END, |
|
jsonEvent, |
|
eventTypeFilter, |
|
AllStreamResolvedEvent, |
|
AllStreamRecordedEvent, |
|
ReadPosition, |
|
Position, |
|
JSONEventType, |
|
Filter |
|
} from "@eventstore/db-client"; |
|
import * as faker from "faker"; |
|
|
|
type RecommendationEvent = JSONEventType< |
|
"TestRecommendation", |
|
{ |
|
sn: number; |
|
productId: string; |
|
customerCatalogId: string; |
|
customerSKUId: string; |
|
strategyId: string; |
|
strategyName: string; |
|
recommendedPrice: string; |
|
recommendedPriceAt: string; |
|
pricingBoundsTriggered: string; |
|
} |
|
>; |
|
|
|
function logstamp(msg: string): void { |
|
console.log(new Date().toISOString() + ": " + msg); |
|
} |
|
|
|
const regexFilter = process.env.RW_REGEX_FILTER |
|
|
|
let subscribeStartTime: number; |
|
let readPosition: Position; |
|
let lastProcessedEvent: AllStreamResolvedEvent; |
|
let processedEventCount = -1; |
|
|
|
class EsRateWatch { |
|
client: EventStoreDBClient |
|
|
|
constructor(connectionString: string) { |
|
logstamp(`Creating client for ${connectionString}`) |
|
this.client = EventStoreDBClient.connectionString(connectionString); |
|
} |
|
|
|
async start(filter?: Filter | undefined): Promise<void> { |
|
await this.readSpeed(1000); |
|
await this.writeSpeed(1000); |
|
|
|
this.client |
|
.subscribeToAll({ |
|
fromPosition: END, |
|
resolveLinkTos: false, |
|
// filter: filter, |
|
}) |
|
.on("data", this.processData.bind(this)); |
|
} |
|
|
|
report(): void { |
|
const duration = (Date.now() - subscribeStartTime) / 1000; |
|
console.log( |
|
`${new Date().toISOString()}: ${processedEventCount} events rate/s = ${ |
|
processedEventCount / duration |
|
}` |
|
); |
|
processedEventCount = 0; |
|
subscribeStartTime = Date.now(); |
|
} |
|
|
|
async reportLatency(currentEvent: AllStreamResolvedEvent): Promise<void> { |
|
const records = await this.client.readAll({ |
|
direction: BACKWARDS, |
|
fromPosition: END, |
|
maxCount: 1, |
|
}); |
|
if (records.length <= 0) { |
|
console.log("Got no events"); |
|
return; |
|
} |
|
// Get times in milliseconds |
|
const lastEventTime = Math.trunc( |
|
(records[0].event?.created || 0) / 10 ** 4 |
|
); |
|
const currentEventTime = Math.trunc( |
|
(currentEvent.event?.created || 0) / 10 ** 4 |
|
); |
|
const now = Date.now(); |
|
logstamp( |
|
`now(${now}) - lastEvent(${lastEventTime}) = ${now - lastEventTime}ms` |
|
); |
|
logstamp( |
|
`lastEvent(${lastEventTime}) - currentEventTime(${currentEventTime}) = ${ |
|
lastEventTime - currentEventTime |
|
}ms` |
|
); |
|
} |
|
|
|
async readOne(position: ReadPosition): Promise<AllStreamRecordedEvent> { |
|
const records = await this.client.readAll({ |
|
direction: FORWARDS, |
|
fromPosition: position, |
|
maxCount: 1, |
|
}); |
|
if (!records[0]?.event) { |
|
throw new Event("No event found"); |
|
} |
|
return records[0].event; |
|
} |
|
|
|
async reportSomeStreams(count: number, position: ReadPosition, filter?: Filter | undefined): Promise<void> { |
|
let streamCount = 0 |
|
const streams = new Map<string, number>() |
|
await new Promise<void>((resolve): void => { |
|
const subscription = this.client |
|
.subscribeToAll({ |
|
fromPosition: END, |
|
resolveLinkTos: false, |
|
filter: filter |
|
}) |
|
.on("data", async (event: AllStreamResolvedEvent) => { |
|
const streamName = event.event?.streamId || '' |
|
const numPerStream = streams.get(streamName) |
|
if (numPerStream === undefined) { |
|
streams.set(streamName, 0) |
|
} else{ |
|
streams.set(streamName, numPerStream + 1) |
|
} |
|
if (++streamCount == count) { |
|
await subscription.unsubscribe() |
|
resolve() |
|
} |
|
}) |
|
.on("end", () => resolve()); |
|
}) |
|
logstamp(`Found ${streams.size} streams`) |
|
streams.forEach((v, k) => logstamp(k)) |
|
} |
|
|
|
/** |
|
* |
|
* @param count Read backwards n times and report ms |
|
*/ |
|
async readSpeed(count: number): Promise<void> { |
|
const start = Date.now(); |
|
if (!readPosition) readPosition = (await this.readOne(START)).position; |
|
for (let i = 0; i < count; i++) { |
|
try { |
|
readPosition = (await this.readOne(readPosition)).position; |
|
} catch (err) { |
|
logstamp(`Got error reading: ${err.message}`); |
|
} |
|
} |
|
const duration = Date.now() - start; |
|
logstamp( |
|
`Read ${count} event in ${duration}ms. Rate: ${Math.trunc( |
|
duration / count |
|
)}ms` |
|
); |
|
} |
|
|
|
async writeSpeed(count: number): Promise<void> { |
|
const start = Date.now(); |
|
let numWritten = 0; |
|
for (let i = 0; i < count; i++) { |
|
const event = jsonEvent<RecommendationEvent>({ |
|
type: "TestRecommendation", |
|
data: { |
|
sn: count, |
|
productId: faker.random.alphaNumeric(128), |
|
customerCatalogId: faker.random.alphaNumeric(128), |
|
customerSKUId: faker.random.alphaNumeric(128), |
|
strategyId: faker.random.alphaNumeric(128), |
|
strategyName: faker.random.alphaNumeric(256), |
|
recommendedPrice: faker.commerce.price(0, 300, 2, "$"), |
|
recommendedPriceAt: faker.date.recent(0).toISOString(), |
|
pricingBoundsTriggered: "none", |
|
}, |
|
}); |
|
const appendTime = Date.now(); |
|
try { |
|
const appendResult = await this.client.appendToStream("WriteTestStream", [ |
|
event, |
|
]); |
|
if (!appendResult) { |
|
logstamp("Failed to write in speedtest - bailing"); |
|
break; |
|
} |
|
numWritten++; |
|
} catch (err) { |
|
logstamp( |
|
`Got error writing with ${err.message} after ${ |
|
Date.now() - appendTime |
|
}` |
|
); |
|
} |
|
} |
|
const duration = Date.now() - start; |
|
logstamp( |
|
`Wrote ${numWritten} event in ${duration}ms. Rate: ${Math.trunc( |
|
duration / numWritten |
|
)}ms` |
|
); |
|
} |
|
|
|
async processData(event: AllStreamResolvedEvent): Promise<any> { |
|
lastProcessedEvent = event; |
|
if (processedEventCount == -1) { |
|
processedEventCount = 0; |
|
console.log("Got first event - reporting every 1 minutes"); |
|
subscribeStartTime = Date.now(); |
|
await this.reportLatency(lastProcessedEvent); |
|
setInterval(async () => { |
|
this.report(); |
|
await this.reportLatency(lastProcessedEvent); |
|
await this.readSpeed(1000); |
|
await this.writeSpeed(1000); |
|
}, 60 * 1000); |
|
} |
|
++processedEventCount; |
|
} |
|
} |
|
|
|
const watcher = new EsRateWatch(process.env.RW_CONNECTION || "esdb://localhost:2113?Tls=false") |
|
watcher.start() |
|
// watcher.readEventFiltered(10, START, '.*.*$') |