forum_model.js 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. const pull = require('../server/node_modules/pull-stream');
  2. module.exports = ({ cooler }) => {
  3. let ssb, userId;
  4. const openSsb = async () => {
  5. if (!ssb) {
  6. ssb = await cooler.open();
  7. userId = ssb.id;
  8. }
  9. return ssb;
  10. };
  11. async function collectTombstones(ssbClient) {
  12. return new Promise((resolve, reject) => {
  13. const tomb = new Set();
  14. pull(
  15. ssbClient.createLogStream(),
  16. pull.filter(m => m.value.content?.type === 'tombstone' && m.value.content.target),
  17. pull.drain(m => tomb.add(m.value.content.target), err => err ? reject(err) : resolve(tomb))
  18. );
  19. });
  20. }
  21. async function findActiveVote(ssbClient, targetId, voter) {
  22. const tombstoned = await collectTombstones(ssbClient);
  23. return new Promise((resolve, reject) => {
  24. pull(
  25. ssbClient.links({ source: voter, dest: targetId, rel: 'vote', values: true, keys: true }),
  26. pull.filter(link => !tombstoned.has(link.key)),
  27. pull.collect((err, links) => err ? reject(err) : resolve(links))
  28. );
  29. });
  30. }
  31. async function aggregateVotes(ssbClient, targetId) {
  32. const tombstoned = await collectTombstones(ssbClient);
  33. return new Promise((resolve, reject) => {
  34. let positives = 0, negatives = 0;
  35. pull(
  36. ssbClient.links({ source: null, dest: targetId, rel: 'vote', values: true, keys: true }),
  37. pull.filter(link => link.value.content?.vote && !tombstoned.has(link.key)),
  38. pull.drain(
  39. link => link.value.content.vote.value > 0 ? positives++ : negatives++,
  40. err => err ? reject(err) : resolve({ positives, negatives })
  41. )
  42. );
  43. });
  44. }
  45. function nestReplies(flat) {
  46. const lookup = new Map();
  47. const roots = [];
  48. for (const msg of flat) {
  49. msg.children = [];
  50. lookup.set(msg.key, msg);
  51. }
  52. for (const msg of flat) {
  53. if (msg.parent && lookup.has(msg.parent)) {
  54. lookup.get(msg.parent).children.push(msg);
  55. } else {
  56. roots.push(msg);
  57. }
  58. }
  59. return roots;
  60. }
  61. async function getMessageById(id) {
  62. const ssbClient = await openSsb();
  63. const msgs = await new Promise((res, rej) =>
  64. pull(ssbClient.createLogStream(), pull.collect((err, data) => err ? rej(err) : res(data)))
  65. );
  66. const msg = msgs.find(m => m.key === id && m.value.content?.type === 'forum');
  67. if (!msg) throw new Error('Message not found');
  68. return { key: msg.key, ...msg.value.content, timestamp: msg.value.timestamp };
  69. }
  70. return {
  71. createForum: async (category, title, text) => {
  72. const ssbClient = await openSsb();
  73. const content = {
  74. type: 'forum',
  75. category,
  76. title,
  77. text,
  78. createdAt: new Date().toISOString(),
  79. author: userId,
  80. votes: { positives: 0, negatives: 0 },
  81. votes_inhabitants: []
  82. };
  83. return new Promise((resolve, reject) =>
  84. ssbClient.publish(content, (err, res) => err ? reject(err) : resolve({ key: res.key, ...content }))
  85. );
  86. },
  87. addMessageToForum: async (forumId, message, parentId = null) => {
  88. const ssbClient = await openSsb();
  89. const content = {
  90. ...message,
  91. root: forumId,
  92. type: 'forum',
  93. author: userId,
  94. timestamp: new Date().toISOString(),
  95. votes: { positives: 0, negatives: 0 },
  96. votes_inhabitants: []
  97. };
  98. if (parentId) content.branch = parentId;
  99. return new Promise((resolve, reject) =>
  100. ssbClient.publish(content, (err, res) => err ? reject(err) : resolve(res))
  101. );
  102. },
  103. voteContent: async (targetId, value) => {
  104. const ssbClient = await openSsb();
  105. const whoami = await new Promise((res, rej) =>
  106. ssbClient.whoami((err, info) => err ? rej(err) : res(info))
  107. );
  108. const voter = whoami.id;
  109. const newVal = parseInt(value, 10);
  110. const existing = await findActiveVote(ssbClient, targetId, voter);
  111. if (existing.length > 0) {
  112. const prev = existing[0].value.content.vote.value;
  113. if (prev === newVal) return existing[0];
  114. await new Promise((resolve, reject) =>
  115. ssbClient.publish(
  116. { type: 'tombstone', target: existing[0].key, timestamp: new Date().toISOString(), author: voter },
  117. err => err ? reject(err) : resolve()
  118. )
  119. );
  120. }
  121. return new Promise((resolve, reject) =>
  122. ssbClient.publish(
  123. {
  124. type: 'vote',
  125. vote: { link: targetId, value: newVal },
  126. timestamp: new Date().toISOString(),
  127. author: voter
  128. },
  129. (err, result) => err ? reject(err) : resolve(result)
  130. )
  131. );
  132. },
  133. deleteForumById: async id => {
  134. const ssbClient = await openSsb();
  135. return new Promise((resolve, reject) =>
  136. ssbClient.publish(
  137. { type: 'tombstone', target: id, timestamp: new Date().toISOString(), author: userId },
  138. (err, res) => err ? reject(err) : resolve(res)
  139. )
  140. );
  141. },
  142. listAll: async filter => {
  143. const ssbClient = await openSsb();
  144. const msgs = await new Promise((res, rej) =>
  145. pull(ssbClient.createLogStream(), pull.collect((err, data) => err ? rej(err) : res(data)))
  146. );
  147. const deleted = new Set(
  148. msgs.filter(m => m.value.content?.type === 'tombstone').map(m => m.value.content.target)
  149. );
  150. const forums = msgs
  151. .filter(m => m.value.content?.type === 'forum' && !m.value.content.root && !deleted.has(m.key))
  152. .map(m => ({ ...m.value.content, key: m.key }));
  153. const forumsWithVotes = await Promise.all(
  154. forums.map(async f => {
  155. const { positives, negatives } = await aggregateVotes(ssbClient, f.key);
  156. return { ...f, positiveVotes: positives, negativeVotes: negatives };
  157. })
  158. );
  159. const repliesByRoot = {};
  160. msgs.forEach(m => {
  161. const c = m.value.content;
  162. if (c?.type === 'forum' && c.root && !deleted.has(m.key)) {
  163. repliesByRoot[c.root] = repliesByRoot[c.root] || [];
  164. repliesByRoot[c.root].push({ key: m.key, text: c.text, author: c.author, timestamp: m.value.timestamp });
  165. }
  166. });
  167. const final = await Promise.all(
  168. forumsWithVotes.map(async f => {
  169. const replies = repliesByRoot[f.key] || [];
  170. for (let r of replies) {
  171. const { positives: rp, negatives: rn } = await aggregateVotes(ssbClient, r.key);
  172. r.positiveVotes = rp;
  173. r.negativeVotes = rn;
  174. r.score = rp - rn;
  175. }
  176. const replyPos = replies.reduce((sum, r) => sum + (r.positiveVotes || 0), 0);
  177. const replyNeg = replies.reduce((sum, r) => sum + (r.negativeVotes || 0), 0);
  178. const positiveVotes = f.positiveVotes + replyPos;
  179. const negativeVotes = f.negativeVotes + replyNeg;
  180. const score = positiveVotes - negativeVotes;
  181. const participants = new Set(replies.map(r => r.author).concat(f.author));
  182. const messagesCount = replies.length + 1;
  183. const lastMessage =
  184. replies.length
  185. ? replies.reduce((a, b) => (new Date(a.timestamp) > new Date(b.timestamp) ? a : b))
  186. : null;
  187. return {
  188. ...f,
  189. positiveVotes,
  190. negativeVotes,
  191. score,
  192. participants: Array.from(participants),
  193. messagesCount,
  194. lastMessage,
  195. messages: replies
  196. };
  197. })
  198. );
  199. const filtered =
  200. filter === 'mine'
  201. ? final.filter(f => f.author === userId)
  202. : filter === 'recent'
  203. ? final.filter(f => new Date(f.createdAt).getTime() >= Date.now() - 86400000)
  204. : final;
  205. return filtered.sort((a, b) => new Date(b.createdAt) - new Date(a.createdAt));
  206. },
  207. getForumById: async id => {
  208. const ssbClient = await openSsb();
  209. const msgs = await new Promise((res, rej) =>
  210. pull(ssbClient.createLogStream(), pull.collect((err, data) => err ? rej(err) : res(data)))
  211. );
  212. const deleted = new Set(
  213. msgs.filter(m => m.value.content?.type === 'tombstone').map(m => m.value.content.target)
  214. );
  215. const original = msgs.find(m => m.key === id && !deleted.has(m.key));
  216. if (!original || original.value.content?.type !== 'forum') throw new Error('Forum not found');
  217. const base = original.value.content;
  218. const { positives, negatives } = await aggregateVotes(ssbClient, id);
  219. return {
  220. ...base,
  221. key: id,
  222. positiveVotes: positives,
  223. negativeVotes: negatives,
  224. score: positives - negatives
  225. };
  226. },
  227. getMessagesByForumId: async forumId => {
  228. const ssbClient = await openSsb();
  229. const msgs = await new Promise((res, rej) =>
  230. pull(ssbClient.createLogStream(), pull.collect((err, data) => err ? rej(err) : res(data)))
  231. );
  232. const deleted = new Set(
  233. msgs.filter(m => m.value.content?.type === 'tombstone').map(m => m.value.content.target)
  234. );
  235. const replies = msgs
  236. .filter(m => m.value.content?.type === 'forum' && m.value.content.root === forumId && !deleted.has(m.key))
  237. .map(m => ({
  238. key: m.key,
  239. text: m.value.content.text,
  240. author: m.value.content.author,
  241. timestamp: m.value.timestamp,
  242. parent: m.value.content.branch || null
  243. }));
  244. for (let r of replies) {
  245. const { positives: rp, negatives: rn } = await aggregateVotes(ssbClient, r.key);
  246. r.positiveVotes = rp;
  247. r.negativeVotes = rn;
  248. r.score = rp - rn;
  249. }
  250. const { positives: p, negatives: n } = await aggregateVotes(ssbClient, forumId);
  251. const replyPos = replies.reduce((sum, r) => sum + (r.positiveVotes || 0), 0);
  252. const replyNeg = replies.reduce((sum, r) => sum + (r.negativeVotes || 0), 0);
  253. const positiveVotes = p + replyPos;
  254. const negativeVotes = n + replyNeg;
  255. const totalScore = positiveVotes - negativeVotes;
  256. return {
  257. messages: nestReplies(replies),
  258. total: replies.length,
  259. positiveVotes,
  260. negativeVotes,
  261. totalScore
  262. };
  263. },
  264. getMessageById
  265. };
  266. };