jobs_model.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. const pull = require('../server/node_modules/pull-stream')
  2. const moment = require('../server/node_modules/moment')
  3. const { getConfig } = require('../configs/config-manager.js');
  4. const logLimit = getConfig().ssbLogStream?.limit || 1000;
  5. module.exports = ({ cooler }) => {
  6. let ssb;
  7. const openSsb = async () => {
  8. if (!ssb) ssb = await cooler.open();
  9. return ssb;
  10. };
  11. const fields = [
  12. 'job_type','title','description','requirements','languages',
  13. 'job_time','tasks','location','vacants','salary','image',
  14. 'author','createdAt','updatedAt','status','subscribers'
  15. ];
  16. const pickJobFields = (obj = {}) => ({
  17. job_type: obj.job_type,
  18. title: obj.title,
  19. description: obj.description,
  20. requirements: obj.requirements,
  21. languages: obj.languages,
  22. job_time: obj.job_time,
  23. tasks: obj.tasks,
  24. location: obj.location,
  25. vacants: obj.vacants,
  26. salary: obj.salary,
  27. image: obj.image,
  28. author: obj.author,
  29. createdAt: obj.createdAt,
  30. updatedAt: obj.updatedAt,
  31. status: obj.status,
  32. subscribers: Array.isArray(obj.subscribers) ? obj.subscribers : []
  33. });
  34. return {
  35. type: 'job',
  36. async createJob(jobData) {
  37. const ssbClient = await openSsb();
  38. let blobId = jobData.image;
  39. if (blobId && /\(([^)]+)\)/.test(blobId)) blobId = blobId.match(/\(([^)]+)\)/)[1];
  40. const base = pickJobFields(jobData);
  41. const content = {
  42. type: 'job',
  43. ...base,
  44. image: blobId,
  45. author: ssbClient.id,
  46. createdAt: new Date().toISOString(),
  47. status: 'OPEN',
  48. subscribers: []
  49. };
  50. return new Promise((res, rej) => ssbClient.publish(content, (e, m) => e ? rej(e) : res(m)));
  51. },
  52. async updateJob(id, jobData) {
  53. const ssbClient = await openSsb();
  54. const current = await this.getJobById(id);
  55. const onlySubscribersChange = Object.keys(jobData).length > 0 && Object.keys(jobData).every(k => k === 'subscribers');
  56. if (!onlySubscribersChange && current.author !== ssbClient.id) throw new Error('Unauthorized');
  57. let blobId = jobData.image ?? current.image;
  58. if (blobId && /\(([^)]+)\)/.test(blobId)) blobId = blobId.match(/\(([^)]+)\)/)[1];
  59. const patch = {};
  60. for (const f of fields) {
  61. if (Object.prototype.hasOwnProperty.call(jobData, f) && jobData[f] !== undefined) {
  62. patch[f] = f === 'image' ? blobId : jobData[f];
  63. }
  64. }
  65. const next = {
  66. ...current,
  67. ...patch,
  68. image: ('image' in patch ? blobId : current.image),
  69. updatedAt: new Date().toISOString()
  70. };
  71. const tomb = {
  72. type: 'tombstone',
  73. target: id,
  74. deletedAt: new Date().toISOString(),
  75. author: ssbClient.id
  76. };
  77. const content = {
  78. type: 'job',
  79. job_type: next.job_type,
  80. title: next.title,
  81. description: next.description,
  82. requirements: next.requirements,
  83. languages: next.languages,
  84. job_time: next.job_time,
  85. tasks: next.tasks,
  86. location: next.location,
  87. vacants: next.vacants,
  88. salary: next.salary,
  89. image: next.image,
  90. author: current.author,
  91. createdAt: current.createdAt,
  92. updatedAt: next.updatedAt,
  93. status: next.status,
  94. subscribers: Array.isArray(next.subscribers) ? next.subscribers : [],
  95. replaces: id
  96. };
  97. await new Promise((res, rej) => ssbClient.publish(tomb, e => e ? rej(e) : res()));
  98. return new Promise((res, rej) => ssbClient.publish(content, (e, m) => e ? rej(e) : res(m)));
  99. },
  100. async updateJobStatus(id, status) {
  101. return this.updateJob(id, { status });
  102. },
  103. async deleteJob(id) {
  104. const ssbClient = await openSsb();
  105. const latestId = await this.getJobTipId(id);
  106. const job = await this.getJobById(latestId);
  107. if (job.author !== ssbClient.id) throw new Error('Unauthorized');
  108. const tomb = {
  109. type: 'tombstone',
  110. target: latestId,
  111. deletedAt: new Date().toISOString(),
  112. author: ssbClient.id
  113. };
  114. return new Promise((res, rej) => ssbClient.publish(tomb, (e, r) => e ? rej(e) : res(r)));
  115. },
  116. async listJobs(filter) {
  117. const ssbClient = await openSsb();
  118. const currentUserId = ssbClient.id;
  119. return new Promise((res, rej) => {
  120. pull(
  121. ssbClient.createLogStream({ limit: logLimit }),
  122. pull.collect((e, msgs) => {
  123. if (e) return rej(e);
  124. const tomb = new Set();
  125. const replaces = new Map();
  126. const referencedAsReplaces = new Set();
  127. const jobs = new Map();
  128. msgs.forEach(m => {
  129. const k = m.key;
  130. const c = m.value.content;
  131. if (!c) return;
  132. if (c.type === 'tombstone' && c.target) { tomb.add(c.target); return; }
  133. if (c.type !== 'job') return;
  134. if (c.replaces) { replaces.set(c.replaces, k); referencedAsReplaces.add(c.replaces); }
  135. jobs.set(k, { key: k, content: c });
  136. });
  137. const tipJobs = [];
  138. for (const [id, job] of jobs.entries()) {
  139. if (!referencedAsReplaces.has(id)) tipJobs.push(job);
  140. }
  141. const groups = {};
  142. for (const job of tipJobs) {
  143. const ancestor = job.content.replaces || job.key;
  144. if (!groups[ancestor]) groups[ancestor] = [];
  145. groups[ancestor].push(job);
  146. }
  147. const liveTipIds = new Set();
  148. for (const groupJobs of Object.values(groups)) {
  149. let best = groupJobs[0];
  150. for (const job of groupJobs) {
  151. if (
  152. job.content.status === 'CLOSED' ||
  153. (best.content.status !== 'CLOSED' &&
  154. new Date(job.content.updatedAt || job.content.createdAt || 0) >
  155. new Date(best.content.updatedAt || best.content.createdAt || 0))
  156. ) {
  157. best = job;
  158. }
  159. }
  160. liveTipIds.add(best.key);
  161. }
  162. let list = Array.from(jobs.values())
  163. .filter(j => liveTipIds.has(j.key) && !tomb.has(j.key))
  164. .map(j => ({ id: j.key, ...j.content }));
  165. const F = String(filter).toUpperCase();
  166. if (F === 'MINE') list = list.filter(j => j.author === currentUserId);
  167. else if (F === 'REMOTE') list = list.filter(j => (j.location || '').toUpperCase() === 'REMOTE');
  168. else if (F === 'PRESENCIAL') list = list.filter(j => (j.location || '').toUpperCase() === 'PRESENCIAL');
  169. else if (F === 'FREELANCER') list = list.filter(j => (j.job_type || '').toUpperCase() === 'FREELANCER');
  170. else if (F === 'EMPLOYEE') list = list.filter(j => (j.job_type || '').toUpperCase() === 'EMPLOYEE');
  171. else if (F === 'OPEN') list = list.filter(j => (j.status || '').toUpperCase() === 'OPEN');
  172. else if (F === 'CLOSED') list = list.filter(j => (j.status || '').toUpperCase() === 'CLOSED');
  173. else if (F === 'RECENT') list = list.filter(j => moment(j.createdAt).isAfter(moment().subtract(24, 'hours')));
  174. if (F === 'TOP') list.sort((a, b) => parseFloat(b.salary || 0) - parseFloat(a.salary || 0));
  175. else list.sort((a, b) => new Date(b.createdAt) - new Date(a.createdAt));
  176. res(list);
  177. })
  178. );
  179. });
  180. },
  181. async getJobById(id) {
  182. const ssbClient = await openSsb();
  183. const all = await new Promise((r, j) => {
  184. pull(
  185. ssbClient.createLogStream({ limit: logLimit }),
  186. pull.collect((e, m) => e ? j(e) : r(m))
  187. );
  188. });
  189. const tomb = new Set();
  190. const replaces = new Map();
  191. all.forEach(m => {
  192. const c = m.value.content;
  193. if (!c) return;
  194. if (c.type === 'tombstone' && c.target) tomb.add(c.target);
  195. else if (c.type === 'job' && c.replaces) replaces.set(c.replaces, m.key);
  196. });
  197. let key = id;
  198. while (replaces.has(key)) key = replaces.get(key);
  199. if (tomb.has(key)) throw new Error('Job not found');
  200. const msg = await new Promise((r, j) => ssbClient.get(key, (e, m) => e ? j(e) : r(m)));
  201. if (!msg) throw new Error('Job not found');
  202. const { id: _dropId, replaces: _dropReplaces, ...safeContent } = msg.content || {};
  203. const clean = pickJobFields(safeContent);
  204. return { id: key, ...clean };
  205. },
  206. async getJobTipId(id) {
  207. const ssbClient = await openSsb();
  208. const all = await new Promise((r, j) => {
  209. pull(
  210. ssbClient.createLogStream({ limit: logLimit }),
  211. pull.collect((e, m) => e ? j(e) : r(m))
  212. );
  213. });
  214. const tomb = new Set();
  215. const replaces = new Map();
  216. all.forEach(m => {
  217. const c = m.value.content;
  218. if (!c) return;
  219. if (c.type === 'tombstone' && c.target) {
  220. tomb.add(c.target);
  221. } else if (c.type === 'job' && c.replaces) {
  222. replaces.set(c.replaces, m.key);
  223. }
  224. });
  225. let key = id;
  226. while (replaces.has(key)) key = replaces.get(key);
  227. if (tomb.has(key)) throw new Error('Job not found');
  228. return key;
  229. },
  230. async subscribeToJob(id, userId) {
  231. const latestId = await this.getJobTipId(id);
  232. const job = await this.getJobById(latestId);
  233. const current = Array.isArray(job.subscribers) ? job.subscribers.slice() : [];
  234. if (current.includes(userId)) throw new Error('Already subscribed');
  235. const next = current.concat(userId);
  236. return this.updateJob(latestId, { subscribers: next });
  237. },
  238. async unsubscribeFromJob(id, userId) {
  239. const latestId = await this.getJobTipId(id);
  240. const job = await this.getJobById(latestId);
  241. const current = Array.isArray(job.subscribers) ? job.subscribers.slice() : [];
  242. if (!current.includes(userId)) throw new Error('Not subscribed');
  243. const next = current.filter(uid => uid !== userId);
  244. return this.updateJob(latestId, { subscribers: next });
  245. }
  246. };
  247. };