perf(stats): threads, better & parallel queries

This commit is contained in:
Isaac 2025-02-12 01:37:44 +00:00
parent 2ed25f7bbf
commit 6b0146e099
No known key found for this signature in database
GPG Key ID: 0DE40AE37BBA5C33
8 changed files with 230 additions and 106 deletions

View File

@ -1,27 +1,22 @@
/* eslint-disable no-underscore-dangle */ /* eslint-disable no-underscore-dangle */
const {
spawn,
Pool,
Worker,
} = require('threads');
const { cpus } = require('node:os');
const { version } = require('../../package.json'); const { version } = require('../../package.json');
const { md5 } = require('./misc'); const { md5 } = require('./misc');
const {
quick,
relativePool,
} = require('./threads');
// ! ceiL: at least 1 const getAverageTimes = closedTickets => quick('stats', async w => ({
const poolSize = Math.ceil(cpus().length / 4); avgResolutionTime: await w.getAvgResolutionTime(closedTickets),
const pool = Pool(() => spawn(new Worker('./workers/stats.js')), { size: poolSize }); avgResponseTime: await w.getAvgResponseTime(closedTickets),
}));
module.exports.getAvgResolutionTime = tickets => (tickets.reduce((total, ticket) => total + (ticket.closedAt - ticket.createdAt), 0) || 1) / Math.max(tickets.length, 1);
module.exports.getAvgResponseTime = tickets => (tickets.reduce((total, ticket) => total + (ticket.firstResponseAt - ticket.createdAt), 0) || 1) / Math.max(tickets.length, 1);
/** /**
* * Report stats to Houston
* @param {import("../client")} client * @param {import("../client")} client
*/ */
module.exports.sendToHouston = async client => { async function sendToHouston(client) {
const guilds = await client.prisma.guild.findMany({ const guilds = await client.prisma.guild.findMany({
include: { include: {
categories: { include: { _count: { select: { questions: true } } } }, categories: { include: { _count: { select: { questions: true } } } },
@ -35,30 +30,32 @@ module.exports.sendToHouston = async client => {
}, },
}, },
}); });
const users = (await client.prisma.user.aggregate({ const users = await client.prisma.user.aggregate({
_count: true, _count: true,
_sum: { messageCount: true }, _sum: { messageCount: true },
})); });
const messages = users._sum.messageCount ?? 0; const messages = users._sum.messageCount;
const stats = { const stats = {
activated_users: users._count, activated_users: users._count,
arch: process.arch, arch: process.arch,
database: process.env.DB_PROVIDER, database: process.env.DB_PROVIDER,
guilds: await Promise.all( guilds: await Promise.all(
guilds await relativePool(0.25, 'stats', pool =>
.filter(guild => client.guilds.cache.has(guild.id)) guilds
.map(async guild => { .filter(guild => client.guilds.cache.has(guild.id))
guild.members = client.guilds.cache.get(guild.id).memberCount; .map(async guild => {
return pool.queue(worker => worker.aggregateGuildForHouston(guild, messages)); guild.members = client.guilds.cache.get(guild.id).memberCount;
}), return pool.queue(w => w.aggregateGuildForHouston(guild, messages));
}),
),
), ),
id: md5(client.user.id), id: md5(client.user.id),
node: process.version, node: process.version,
os: process.platform, os: process.platform,
version, version,
}; };
const delta = guilds.length - stats.guilds.length; const delta = guilds.length - stats.guilds.length;
if (delta !== 0) { if (delta !== 0) {
client.log.warn('%d guilds are not cached and were excluded from the stats report', delta); client.log.warn('%d guilds are not cached and were excluded from the stats report', delta);
} }
@ -84,3 +81,8 @@ module.exports.sendToHouston = async client => {
client.log.debug(res); client.log.debug(res);
} }
}; };
module.exports = {
getAverageTimes,
sendToHouston,
};

59
src/lib/threads.js Normal file
View File

@ -0,0 +1,59 @@
const {
spawn,
Pool,
Thread,
Worker,
} = require('threads');
const { cpus } = require('node:os');
/**
* Use a thread pool of a fixed size
* @param {number} size number of threads
* @param {string} name name of file in workers directory
* @param {function} fun async function
* @returns {Promise<any>}
*/
async function pool(size, name, fun) {
const pool = Pool(() => spawn(new Worker(`./workers/${name}.js`)), { size });
try {
return await fun(pool);
} finally {
await pool.settled();
await pool.terminate();
}
};
/**
* Spawn one thread, do something, and terminate it
* @param {string} name name of file in workers directory
* @param {function} fun async function
* @returns {Promise<any}
*/
async function quick(name, fun) {
const thread = await spawn(new Worker(`./workers/${name}.js`));
try {
// ! this await is extremely important
return await fun(thread);
} finally {
await Thread.terminate(thread);
}
};
/**
* Use a thread pool of a variable size
* @param {number} size fraction of available CPU cores to use (ceil'd)
* @param {string} name name of file in workers directory
* @param {function} fun async function
* @returns {Promise<any>}
*/
function relativePool(fraction, ...args) {
// ! ceiL: at least 1
const poolSize = Math.ceil(fraction * cpus().length);
return pool(poolSize, ...args);
}
module.exports = {
pool,
quick,
relativePool,
};

View File

@ -20,15 +20,12 @@ const { isStaff } = require('../users');
const { Collection } = require('discord.js'); const { Collection } = require('discord.js');
const spacetime = require('spacetime'); const spacetime = require('spacetime');
const Cryptr = require('cryptr'); const Cryptr = require('cryptr');
const {
getAvgResolutionTime,
getAvgResponseTime,
} = require('../stats');
const { const {
decrypt, decrypt,
encrypt, encrypt,
} = new Cryptr(process.env.ENCRYPTION_KEY); } = new Cryptr(process.env.ENCRYPTION_KEY);
const { getSUID } = require('../logging'); const { getSUID } = require('../logging');
const { getAverageTimes } = require('../stats');
/** /**
* @typedef {import('@prisma/client').Category & * @typedef {import('@prisma/client').Category &
@ -434,9 +431,13 @@ module.exports = class TicketManager {
open: false, open: false,
}, },
}); });
const {
avgResolutionTime,
avgResponseTime,
} = await getAverageTimes(closedTickets);
stats = { stats = {
avgResolutionTime: ms(getAvgResolutionTime(closedTickets), { long: true }), avgResolutionTime: ms(avgResolutionTime, { long: true }),
avgResponseTime: ms(getAvgResponseTime(closedTickets), { long: true }), avgResponseTime: ms(avgResponseTime, { long: true }),
}; };
this.client.keyv.set(statsCacheKey, stats, ms('1h')); this.client.keyv.set(statsCacheKey, stats, ms('1h'));
} }

View File

@ -7,9 +7,13 @@ const md5 = str => createHash('md5').update(str).digest('hex');
const msToMins = ms => Number((ms / 1000 / 60).toFixed(2)); const msToMins = ms => Number((ms / 1000 / 60).toFixed(2));
const getAvgResolutionTime = tickets => (tickets.reduce((total, ticket) => total + (ticket.closedAt - ticket.createdAt), 0) || 1) / Math.max(tickets.length, 1); const reduce = (closedTickets, prop) => closedTickets.reduce((total, ticket) => total + (ticket[prop] - ticket.createdAt), 0) || 1;
const getAvgResponseTime = tickets => (tickets.reduce((total, ticket) => total + (ticket.firstResponseAt - ticket.createdAt), 0) || 1) / Math.max(tickets.length, 1); const getAvgResolutionTime = closedTickets => reduce(closedTickets, 'closedAt') / Math.max(closedTickets.length, 1);
const getAvgResponseTime = closedTickets => reduce(closedTickets, 'firstResponseAt') / Math.max(closedTickets.length, 1);
const sum = numbers => numbers.reduce((t, n) => t + n, 0);
expose({ expose({
aggregateGuildForHouston(guild, messages) { aggregateGuildForHouston(guild, messages) {
@ -37,4 +41,5 @@ expose({
}, },
getAvgResolutionTime, getAvgResolutionTime,
getAvgResponseTime, getAvgResponseTime,
sum,
}); });

View File

@ -1,8 +1,4 @@
const { Listener } = require('@eartharoid/dbf'); const { Listener } = require('@eartharoid/dbf');
const {
getAvgResolutionTime,
getAvgResponseTime,
} = require('../../lib/stats');
const ms = require('ms'); const ms = require('ms');
const sync = require('../../lib/sync'); const sync = require('../../lib/sync');
const checkForUpdates = require('../../lib/updates'); const checkForUpdates = require('../../lib/updates');
@ -13,7 +9,10 @@ const {
ButtonStyle, ButtonStyle,
} = require('discord.js'); } = require('discord.js');
const ExtendedEmbedBuilder = require('../../lib/embed'); const ExtendedEmbedBuilder = require('../../lib/embed');
const { sendToHouston } = require('../../lib/stats'); const {
getAverageTimes,
sendToHouston,
} = require('../../lib/stats');
module.exports = class extends Listener { module.exports = class extends Listener {
constructor(client, options) { constructor(client, options) {
@ -69,11 +68,15 @@ module.exports = class extends Listener {
firstResponseAt: true, firstResponseAt: true,
}, },
}); });
const closedTicketsWithResponse = tickets.filter(t => t.firstResponseAt && t.closedAt);
const closedTickets = tickets.filter(t => t.closedAt); const closedTickets = tickets.filter(t => t.closedAt);
const closedTicketsWithResponse = closedTickets.filter(t => t.firstResponseAt);
const {
avgResolutionTime,
avgResponseTime,
} = await getAverageTimes(closedTicketsWithResponse);
cached = { cached = {
avgResolutionTime: ms(getAvgResolutionTime(closedTicketsWithResponse)), avgResolutionTime: ms(avgResolutionTime),
avgResponseTime: ms(getAvgResponseTime(closedTicketsWithResponse)), avgResponseTime: ms(avgResponseTime),
guilds: client.guilds.cache.size, guilds: client.guilds.cache.size,
openTickets: tickets.length - closedTickets.length, openTickets: tickets.length - closedTickets.length,
totalTickets: tickets.length, totalTickets: tickets.length,

View File

@ -6,10 +6,7 @@ const {
ChannelType: { GuildCategory }, ChannelType: { GuildCategory },
} = require('discord.js'); } = require('discord.js');
const ms = require('ms'); const ms = require('ms');
const { const { getAverageTimes } = require('../../../../../../lib/stats');
getAvgResolutionTime,
getAvgResponseTime,
} = require('../../../../../../lib/stats');
module.exports.get = fastify => ({ module.exports.get = fastify => ({
handler: async req => { handler: async req => {
@ -29,24 +26,40 @@ module.exports.get = fastify => ({
name: true, name: true,
requiredRoles: true, requiredRoles: true,
staffRoles: true, staffRoles: true,
tickets: { where: { open: false } }, tickets: {
select: {
closedAt: true,
createdAt: true,
firstResponseAt: true,
},
where: {
firstResponseAt: { not: null },
open: false,
},
},
}, },
}, },
}, },
where: { id: req.params.guild }, where: { id: req.params.guild },
}); });
categories = categories.map(c => {
const closedTickets = c.tickets.filter(t => t.firstResponseAt && t.closedAt); categories = await Promise.all(
c = { categories.map(async category => {
...c, const {
stats: { avgResolutionTime,
avgResolutionTime: ms(getAvgResolutionTime(closedTickets)), avgResponseTime,
avgResponseTime: ms(getAvgResponseTime(closedTickets)), } = await getAverageTimes(category.tickets);
}, category = {
}; ...category,
delete c.tickets; stats: {
return c; avgResolutionTime: ms(avgResolutionTime),
}); avgResponseTime: ms(avgResponseTime),
},
};
delete category.tickets;
return category;
}),
);
return categories; return categories;
}, },

View File

@ -1,10 +1,7 @@
/* eslint-disable no-underscore-dangle */ /* eslint-disable no-underscore-dangle */
const { logAdminEvent } = require('../../../../../lib/logging.js'); const { logAdminEvent } = require('../../../../../lib/logging.js');
const { iconURL } = require('../../../../../lib/misc'); const { iconURL } = require('../../../../../lib/misc');
const { const { getAverageTimes } = require('../../../../../lib/stats');
getAvgResolutionTime,
getAvgResponseTime,
} = require('../../../../../lib/stats');
const ms = require('ms'); const ms = require('ms');
module.exports.delete = fastify => ({ module.exports.delete = fastify => ({
@ -42,39 +39,61 @@ module.exports.get = fastify => ({
if (!cached) { if (!cached) {
const guild = client.guilds.cache.get(id); const guild = client.guilds.cache.get(id);
const settings = await client.prisma.guild.findUnique({ where: { id } }) ?? const settings =
await client.prisma.guild.findUnique({ where: { id } }) ??
await client.prisma.guild.create({ data: { id } }); await client.prisma.guild.create({ data: { id } });
const categories = await client.prisma.category.findMany({ const [
select: { categories,
_count: { select: { tickets: true } }, tags,
id: true, tickets,
name: true, closedTickets,
}, ] = await Promise.all([
where: { guildId: id }, client.prisma.category.findMany({
}); select: {
const tickets = await client.prisma.ticket.findMany({ _count: { select: { tickets: true } },
select: { id: true,
closedAt: true, name: true,
createdAt: true, },
firstResponseAt: true, where: { guildId: id },
}, }),
where: { guildId: id }, client.prisma.tag.count({ where: { guildId: id } }),
}); client.prisma.ticket.count(),
const closedTickets = tickets.filter(t => t.firstResponseAt && t.closedAt); client.prisma.ticket.findMany({
select: {
closedAt: true,
createdAt: true,
firstResponseAt: true,
},
where: {
firstResponseAt: { not: null },
guildId: id,
open: false,
},
}),
client.prisma.user.aggregate({
_count: true,
_sum: { messageCount: true },
}),
]);
const {
avgResolutionTime,
avgResponseTime,
} = await getAverageTimes(closedTickets);
cached = { cached = {
createdAt: settings.createdAt, createdAt: settings.createdAt,
id: guild.id, id: guild.id,
logo: iconURL(guild), logo: iconURL(guild),
name: guild.name, name: guild.name,
stats: { stats: {
avgResolutionTime: ms(getAvgResolutionTime(closedTickets)), avgResolutionTime: ms(avgResolutionTime),
avgResponseTime: ms(getAvgResponseTime(closedTickets)), avgResponseTime: ms(avgResponseTime),
categories: categories.map(c => ({ categories: categories.map(c => ({
id: c.id, id: c.id,
name: c.name, name: c.name,
tickets: c._count.tickets, tickets: c._count.tickets,
})), })),
tags: await client.prisma.tag.count({ where: { guildId: id } }), tags,
tickets: tickets.length, tickets: tickets.length,
}, },
}; };

View File

@ -1,8 +1,9 @@
const { /* eslint-disable no-underscore-dangle */
getAvgResolutionTime, getAvgResponseTime,
} = require('../../lib/stats');
const ms = require('ms'); const ms = require('ms');
const pkg = require('../../../package.json'); const pkg = require('../../../package.json');
const { getAverageTimes } = require('../../lib/stats');
const { quick } = require('../../lib/threads');
module.exports.get = () => ({ module.exports.get = () => ({
handler: async req => { handler: async req => {
@ -10,40 +11,61 @@ module.exports.get = () => ({
const client = req.routeOptions.config.client; const client = req.routeOptions.config.client;
const cacheKey = 'cache/stats/client'; const cacheKey = 'cache/stats/client';
let cached = await client.keyv.get(cacheKey); let cached = await client.keyv.get(cacheKey);
if (!cached) { if (!cached) {
const tickets = await client.prisma.ticket.findMany({ const [
select: { categories,
closedAt: true, members,
createdAt: true, tags,
firstResponseAt: true, tickets,
}, closedTickets,
}); users,
const closedTickets = tickets.filter(t => t.firstResponseAt && t.closedAt); ] = await Promise.all([
const users = await client.prisma.user.findMany({ select: { messageCount: true } }); client.prisma.category.count(),
// TODO: background quick('stats', w => w.sum(client.guilds.cache.map(g => g.memberCount))),
client.prisma.tag.count(),
client.prisma.ticket.count(),
client.prisma.ticket.findMany({
select: {
closedAt: true,
createdAt: true,
firstResponseAt: true,
},
where: {
firstResponseAt: { not: null },
open: false,
},
}),
client.prisma.user.aggregate({
_count: true,
_sum: { messageCount: true },
}),
]);
const {
avgResolutionTime,
avgResponseTime,
} = await getAverageTimes(closedTickets);
cached = { cached = {
avatar: client.user.avatarURL(), avatar: client.user.avatarURL(),
discriminator: client.user.discriminator, discriminator: client.user.discriminator,
id: client.user.id, id: client.user.id,
public: (process.env.PUBLIC_BOT === 'true'), public: (process.env.PUBLIC_BOT === 'true'),
stats: { stats: {
activatedUsers: users.length, activatedUsers: users._count,
archivedMessages: users.reduce((total, user) => total + user.messageCount, 0), // don't count archivedMessage table rows, they can be deleted archivedMessages: users._sum.messageCount,
avgResolutionTime: ms(getAvgResolutionTime(closedTickets)), avgResolutionTime: ms(avgResolutionTime),
avgResponseTime: ms(getAvgResponseTime(closedTickets)), avgResponseTime: ms(avgResponseTime),
categories: await client.prisma.category.count(), categories,
guilds: client.guilds.cache.size, guilds: client.guilds.cache.size,
members: client.guilds.cache.reduce((t, g) => t + g.memberCount, 0), members,
tags: await client.prisma.tag.count(), tags,
tickets: tickets.length, tickets,
}, },
username: client.user.username, username: client.user.username,
version: pkg.version, version: pkg.version,
}; };
await client.keyv.set(cacheKey, cached, ms('15m')); await client.keyv.set(cacheKey, cached, ms('15m'));
} }
return cached; return cached;
}, },
}); });