trending_model.js 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. const pull = require('../server/node_modules/pull-stream');
  2. const { getConfig } = require('../configs/config-manager.js');
  3. const logLimit = getConfig().ssbLogStream?.limit || 1000;
  4. module.exports = ({ cooler }) => {
  5. let ssb;
  6. const openSsb = async () => {
  7. if (!ssb) ssb = await cooler.open();
  8. return ssb;
  9. };
  10. const hasBlob = async (ssbClient, url) => {
  11. return new Promise(resolve => {
  12. ssbClient.blobs.has(url, (err, has) => {
  13. resolve(!err && has);
  14. });
  15. });
  16. };
  17. const types = [
  18. 'bookmark', 'votes', 'feed',
  19. 'image', 'audio', 'video', 'document', 'transfer'
  20. ];
  21. const categories = [
  22. 'interesting', 'necessary', 'funny', 'disgusting', 'sensible',
  23. 'propaganda', 'adultOnly', 'boring', 'confusing', 'inspiring', 'spam'
  24. ];
  25. const listTrending = async (filter = 'ALL') => {
  26. const ssbClient = await openSsb();
  27. const userId = ssbClient.id;
  28. const messages = await new Promise((res, rej) => {
  29. pull(
  30. ssbClient.createLogStream({ limit: logLimit }),
  31. pull.collect((err, xs) => err ? rej(err) : res(xs))
  32. );
  33. });
  34. const tombstoned = new Set();
  35. const replaces = new Map();
  36. const itemsById = new Map();
  37. for (const m of messages) {
  38. const k = m.key;
  39. const c = m.value?.content;
  40. if (!c) continue;
  41. if (c.type === 'tombstone' && c.target) {
  42. tombstoned.add(c.target);
  43. continue;
  44. }
  45. if (c.opinions && !tombstoned.has(k) && !['task', 'event', 'report'].includes(c.type)) {
  46. if (c.replaces) replaces.set(c.replaces, k);
  47. itemsById.set(k, m);
  48. }
  49. }
  50. for (const replacedId of replaces.keys()) {
  51. itemsById.delete(replacedId);
  52. }
  53. let rawItems = Array.from(itemsById.values());
  54. const blobTypes = ['document', 'image', 'audio', 'video'];
  55. let items = await Promise.all(
  56. rawItems.map(async m => {
  57. const c = m.value?.content;
  58. if (blobTypes.includes(c.type) && c.url) {
  59. const valid = await hasBlob(ssbClient, c.url);
  60. if (!valid) return null;
  61. }
  62. return m;
  63. })
  64. );
  65. items = items.filter(Boolean);
  66. const signatureOf = (m) => {
  67. const c = m.value?.content || {};
  68. switch (c.type) {
  69. case 'document':
  70. case 'image':
  71. case 'audio':
  72. case 'video':
  73. return `${c.type}::${(c.url || '').trim()}`;
  74. case 'bookmark':
  75. return `bookmark::${(c.url || '').trim().toLowerCase()}`;
  76. case 'feed':
  77. return `feed::${(c.text || '').replace(/\s+/g, ' ').trim()}`;
  78. case 'votes':
  79. return `votes::${(c.question || '').replace(/\s+/g, ' ').trim()}`;
  80. case 'transfer':
  81. return `transfer::${(c.concept || '')}|${c.amount || ''}|${c.from || ''}|${c.to || ''}|${c.deadline || ''}`;
  82. default:
  83. return `key::${m.key}`;
  84. }
  85. };
  86. const bySig = new Map();
  87. for (const m of items) {
  88. const sig = signatureOf(m);
  89. const prev = bySig.get(sig);
  90. if (!prev || (m.value?.timestamp || 0) > (prev.value?.timestamp || 0)) {
  91. bySig.set(sig, m);
  92. }
  93. }
  94. items = Array.from(bySig.values());
  95. if (filter === 'MINE') {
  96. items = items.filter(m => m.value.author === userId);
  97. } else if (filter === 'RECENT') {
  98. const now = Date.now();
  99. items = items.filter(m => now - m.value.timestamp < 24 * 60 * 60 * 1000);
  100. }
  101. if (types.includes(filter)) {
  102. items = items.filter(m => m.value.content.type === filter);
  103. }
  104. if (filter !== 'ALL' && !types.includes(filter)) {
  105. items = items.filter(m => (m.value.content.opinions_inhabitants || []).length > 0);
  106. }
  107. if (filter === 'TOP') {
  108. items.sort((a, b) => {
  109. const aLen = (a.value.content.opinions_inhabitants || []).length;
  110. const bLen = (b.value.content.opinions_inhabitants || []).length;
  111. if (bLen !== aLen) return bLen - aLen;
  112. return b.value.timestamp - a.value.timestamp;
  113. });
  114. } else {
  115. items.sort((a, b) => {
  116. const aLen = (a.value.content.opinions_inhabitants || []).length;
  117. const bLen = (b.value.content.opinions_inhabitants || []).length;
  118. return bLen - aLen;
  119. });
  120. }
  121. return { filtered: items };
  122. };
  123. const getMessageById = async id => {
  124. const ssbClient = await openSsb();
  125. return new Promise((res, rej) => {
  126. ssbClient.get(id, (err, msg) => err ? rej(err) : res(msg));
  127. });
  128. };
  129. const createVote = async (contentId, category) => {
  130. const ssbClient = await openSsb();
  131. const userId = ssbClient.id;
  132. if (!categories.includes(category)) throw new Error('Invalid voting category');
  133. const msg = await getMessageById(contentId);
  134. if (!msg || !msg.content) throw new Error('Content not found');
  135. const type = msg.content.type;
  136. if (!types.includes(type) || ['task', 'event', 'report'].includes(type)) {
  137. throw new Error('Voting not allowed on this content type');
  138. }
  139. if (msg.content.opinions_inhabitants?.includes(userId)) throw new Error('Already voted');
  140. const tombstone = {
  141. type: 'tombstone',
  142. target: contentId,
  143. deletedAt: new Date().toISOString()
  144. };
  145. const updated = {
  146. ...msg.content,
  147. opinions: {
  148. ...msg.content.opinions,
  149. [category]: (msg.content.opinions?.[category] || 0) + 1
  150. },
  151. opinions_inhabitants: [...(msg.content.opinions_inhabitants || []), userId],
  152. updatedAt: new Date().toISOString(),
  153. replaces: contentId
  154. };
  155. await new Promise((res, rej) => {
  156. ssbClient.publish(tombstone, err => err ? rej(err) : res());
  157. });
  158. return new Promise((res, rej) => {
  159. ssbClient.publish(updated, (err, result) => err ? rej(err) : res(result));
  160. });
  161. };
  162. return { listTrending, getMessageById, createVote, types, categories };
  163. };