123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 |
- const pull = require('../server/node_modules/pull-stream')
- const moment = require('../server/node_modules/moment')
- const { getConfig } = require('../configs/config-manager.js');
- const logLimit = getConfig().ssbLogStream?.limit || 1000;
- module.exports = ({ cooler }) => {
- let ssb;
- const openSsb = async () => {
- if (!ssb) ssb = await cooler.open();
- return ssb;
- };
- const fields = [
- 'job_type','title','description','requirements','languages',
- 'job_time','tasks','location','vacants','salary','image',
- 'author','createdAt','updatedAt','status','subscribers'
- ];
- const pickJobFields = (obj = {}) => ({
- job_type: obj.job_type,
- title: obj.title,
- description: obj.description,
- requirements: obj.requirements,
- languages: obj.languages,
- job_time: obj.job_time,
- tasks: obj.tasks,
- location: obj.location,
- vacants: obj.vacants,
- salary: obj.salary,
- image: obj.image,
- author: obj.author,
- createdAt: obj.createdAt,
- updatedAt: obj.updatedAt,
- status: obj.status,
- subscribers: Array.isArray(obj.subscribers) ? obj.subscribers : []
- });
- return {
- type: 'job',
- async createJob(jobData) {
- const ssbClient = await openSsb();
- let blobId = jobData.image;
- if (blobId && /\(([^)]+)\)/.test(blobId)) blobId = blobId.match(/\(([^)]+)\)/)[1];
- const base = pickJobFields(jobData);
- const content = {
- type: 'job',
- ...base,
- image: blobId,
- author: ssbClient.id,
- createdAt: new Date().toISOString(),
- status: 'OPEN',
- subscribers: []
- };
- return new Promise((res, rej) => ssbClient.publish(content, (e, m) => e ? rej(e) : res(m)));
- },
- async updateJob(id, jobData) {
- const ssbClient = await openSsb();
- const current = await this.getJobById(id);
- const onlySubscribersChange = Object.keys(jobData).length > 0 && Object.keys(jobData).every(k => k === 'subscribers');
- if (!onlySubscribersChange && current.author !== ssbClient.id) throw new Error('Unauthorized');
- let blobId = jobData.image ?? current.image;
- if (blobId && /\(([^)]+)\)/.test(blobId)) blobId = blobId.match(/\(([^)]+)\)/)[1];
- const patch = {};
- for (const f of fields) {
- if (Object.prototype.hasOwnProperty.call(jobData, f) && jobData[f] !== undefined) {
- patch[f] = f === 'image' ? blobId : jobData[f];
- }
- }
- const next = {
- ...current,
- ...patch,
- image: ('image' in patch ? blobId : current.image),
- updatedAt: new Date().toISOString()
- };
- const tomb = {
- type: 'tombstone',
- target: id,
- deletedAt: new Date().toISOString(),
- author: ssbClient.id
- };
- const content = {
- type: 'job',
- job_type: next.job_type,
- title: next.title,
- description: next.description,
- requirements: next.requirements,
- languages: next.languages,
- job_time: next.job_time,
- tasks: next.tasks,
- location: next.location,
- vacants: next.vacants,
- salary: next.salary,
- image: next.image,
- author: current.author,
- createdAt: current.createdAt,
- updatedAt: next.updatedAt,
- status: next.status,
- subscribers: Array.isArray(next.subscribers) ? next.subscribers : [],
- replaces: id
- };
- await new Promise((res, rej) => ssbClient.publish(tomb, e => e ? rej(e) : res()));
- return new Promise((res, rej) => ssbClient.publish(content, (e, m) => e ? rej(e) : res(m)));
- },
- async updateJobStatus(id, status) {
- return this.updateJob(id, { status });
- },
- async deleteJob(id) {
- const ssbClient = await openSsb();
- const latestId = await this.getJobTipId(id);
- const job = await this.getJobById(latestId);
- if (job.author !== ssbClient.id) throw new Error('Unauthorized');
- const tomb = {
- type: 'tombstone',
- target: latestId,
- deletedAt: new Date().toISOString(),
- author: ssbClient.id
- };
- return new Promise((res, rej) => ssbClient.publish(tomb, (e, r) => e ? rej(e) : res(r)));
- },
- async listJobs(filter) {
- const ssbClient = await openSsb();
- const currentUserId = ssbClient.id;
- return new Promise((res, rej) => {
- pull(
- ssbClient.createLogStream({ limit: logLimit }),
- pull.collect((e, msgs) => {
- if (e) return rej(e);
- const tomb = new Set();
- const replaces = new Map();
- const referencedAsReplaces = new Set();
- const jobs = new Map();
- msgs.forEach(m => {
- const k = m.key;
- const c = m.value.content;
- if (!c) return;
- if (c.type === 'tombstone' && c.target) { tomb.add(c.target); return; }
- if (c.type !== 'job') return;
- if (c.replaces) { replaces.set(c.replaces, k); referencedAsReplaces.add(c.replaces); }
- jobs.set(k, { key: k, content: c });
- });
- const tipJobs = [];
- for (const [id, job] of jobs.entries()) {
- if (!referencedAsReplaces.has(id)) tipJobs.push(job);
- }
- const groups = {};
- for (const job of tipJobs) {
- const ancestor = job.content.replaces || job.key;
- if (!groups[ancestor]) groups[ancestor] = [];
- groups[ancestor].push(job);
- }
- const liveTipIds = new Set();
- for (const groupJobs of Object.values(groups)) {
- let best = groupJobs[0];
- for (const job of groupJobs) {
- if (
- job.content.status === 'CLOSED' ||
- (best.content.status !== 'CLOSED' &&
- new Date(job.content.updatedAt || job.content.createdAt || 0) >
- new Date(best.content.updatedAt || best.content.createdAt || 0))
- ) {
- best = job;
- }
- }
- liveTipIds.add(best.key);
- }
- let list = Array.from(jobs.values())
- .filter(j => liveTipIds.has(j.key) && !tomb.has(j.key))
- .map(j => ({ id: j.key, ...j.content }));
- const F = String(filter).toUpperCase();
- if (F === 'MINE') list = list.filter(j => j.author === currentUserId);
- else if (F === 'REMOTE') list = list.filter(j => (j.location || '').toUpperCase() === 'REMOTE');
- else if (F === 'PRESENCIAL') list = list.filter(j => (j.location || '').toUpperCase() === 'PRESENCIAL');
- else if (F === 'FREELANCER') list = list.filter(j => (j.job_type || '').toUpperCase() === 'FREELANCER');
- else if (F === 'EMPLOYEE') list = list.filter(j => (j.job_type || '').toUpperCase() === 'EMPLOYEE');
- else if (F === 'OPEN') list = list.filter(j => (j.status || '').toUpperCase() === 'OPEN');
- else if (F === 'CLOSED') list = list.filter(j => (j.status || '').toUpperCase() === 'CLOSED');
- else if (F === 'RECENT') list = list.filter(j => moment(j.createdAt).isAfter(moment().subtract(24, 'hours')));
- if (F === 'TOP') list.sort((a, b) => parseFloat(b.salary || 0) - parseFloat(a.salary || 0));
- else list.sort((a, b) => new Date(b.createdAt) - new Date(a.createdAt));
- res(list);
- })
- );
- });
- },
- async getJobById(id) {
- const ssbClient = await openSsb();
- const all = await new Promise((r, j) => {
- pull(
- ssbClient.createLogStream({ limit: logLimit }),
- pull.collect((e, m) => e ? j(e) : r(m))
- );
- });
- const tomb = new Set();
- const replaces = new Map();
- all.forEach(m => {
- const c = m.value.content;
- if (!c) return;
- if (c.type === 'tombstone' && c.target) tomb.add(c.target);
- else if (c.type === 'job' && c.replaces) replaces.set(c.replaces, m.key);
- });
- let key = id;
- while (replaces.has(key)) key = replaces.get(key);
- if (tomb.has(key)) throw new Error('Job not found');
- const msg = await new Promise((r, j) => ssbClient.get(key, (e, m) => e ? j(e) : r(m)));
- if (!msg) throw new Error('Job not found');
- const { id: _dropId, replaces: _dropReplaces, ...safeContent } = msg.content || {};
- const clean = pickJobFields(safeContent);
- return { id: key, ...clean };
- },
- async getJobTipId(id) {
- const ssbClient = await openSsb();
- const all = await new Promise((r, j) => {
- pull(
- ssbClient.createLogStream({ limit: logLimit }),
- pull.collect((e, m) => e ? j(e) : r(m))
- );
- });
- const tomb = new Set();
- const replaces = new Map();
- all.forEach(m => {
- const c = m.value.content;
- if (!c) return;
- if (c.type === 'tombstone' && c.target) {
- tomb.add(c.target);
- } else if (c.type === 'job' && c.replaces) {
- replaces.set(c.replaces, m.key);
- }
- });
- let key = id;
- while (replaces.has(key)) key = replaces.get(key);
- if (tomb.has(key)) throw new Error('Job not found');
- return key;
- },
- async subscribeToJob(id, userId) {
- const latestId = await this.getJobTipId(id);
- const job = await this.getJobById(latestId);
- const current = Array.isArray(job.subscribers) ? job.subscribers.slice() : [];
- if (current.includes(userId)) throw new Error('Already subscribed');
- const next = current.concat(userId);
- return this.updateJob(latestId, { subscribers: next });
- },
- async unsubscribeFromJob(id, userId) {
- const latestId = await this.getJobTipId(id);
- const job = await this.getJobById(latestId);
- const current = Array.isArray(job.subscribers) ? job.subscribers.slice() : [];
- if (!current.includes(userId)) throw new Error('Not subscribed');
- const next = current.filter(uid => uid !== userId);
- return this.updateJob(latestId, { subscribers: next });
- }
- };
- };
|