models.js 56 KB


  1. "use strict";
  2. const debug = require("debug")("oasis");
  3. const { isRoot, isReply: isComment } = require("ssb-thread-schema");
  4. const lodash = require("lodash");
  5. const prettyMs = require("pretty-ms");
  6. const pullAbortable = require("pull-abortable");
  7. const pullParallelMap = require("pull-paramap");
  8. const pull = require("pull-stream");
  9. const pullSort = require("pull-sort");
  10. const ssbRef = require("ssb-ref");
  11. const isEncrypted = (message) => typeof message.value.content === "string";
  12. const isNotEncrypted = (message) => isEncrypted(message) === false;
  13. const isDecrypted = (message) =>
  14. lodash.get(message, "value.meta.private", false);
  15. const isPrivate = (message) => isEncrypted(message) || isDecrypted(message);
  16. const isNotPrivate = (message) => isPrivate(message) === false;
  17. const hasRoot = (message) =>
  18. ssbRef.isMsg(lodash.get(message, "value.content.root", null));
  19. const hasFork = (message) =>
  20. ssbRef.isMsg(lodash.get(message, "value.content.fork", null));
  21. const hasNoRoot = (message) => hasRoot(message) === false;
  22. const hasNoFork = (message) => hasFork(message) === false;
  23. const isPost = (message) =>
  24. lodash.get(message, "value.content.type") === "post" &&
  25. typeof lodash.get(message, "value.content.text") === "string";
  26. const isBlogPost = (message) =>
  27. lodash.get(message, "value.content.type") === "blog" &&
  28. typeof lodash.get(message, "value.content.title") === "string" &&
  29. ssbRef.isBlob(lodash.get(message, "value.content.blog", null));
  30. const isTextLike = (message) => isPost(message) || isBlogPost(message);
  31. // HACK: https://github.com/ssbc/ssb-thread-schema/issues/4
  32. const isSubtopic = require("ssb-thread-schema/post/nested-reply/validator");
  33. const nullImage = `&${"0".repeat(43)}=.sha256`;
  34. const defaultOptions = {
  35. private: true,
  36. reverse: true,
  37. meta: true,
  38. };
  39. const publicOnlyFilter = pull.filter(isNotPrivate);
  40. /** @param {object[]} customOptions */
  41. const configure = (...customOptions) =>
  42. Object.assign({}, defaultOptions, ...customOptions);
  43. module.exports = ({ cooler, isPublic }) => {
  44. const models = {};
  45. /**
  46. * The SSB-About plugin is a thin wrapper around the SSB-Social-Index plugin.
  47. * Unfortunately, this plugin has two problems that make it incompatible with
  48. * our needs:
  49. *
  50. * - We want to get the latest value from an author, like what someone calls
  51. * themselves, **not what other people call them**.
  52. * - The plugin has a bug where `false` isn't handled correctly, which is very
  53. * important since we use `publicWebHosting`, a boolean field.
  54. *
  55. * It feels very silly to have to maintain an alternative implementation of
  56. * SSB-About, but this is much smaller code and doesn't have either of the
  57. * above problems. Maybe this should be moved somewhere else in the future?
  58. */
  59. const getAbout = async ({ key, feedId }) => {
  60. const ssb = await cooler.open();
  61. const source = ssb.backlinks.read({
  62. reverse: true,
  63. query: [
  64. {
  65. $filter: {
  66. dest: feedId,
  67. value: {
  68. author: feedId,
  69. content: { type: "about", about: feedId },
  70. },
  71. },
  72. },
  73. ],
  74. });
  75. return new Promise((resolve, reject) =>
  76. pull(
  77. source,
  78. pull.find(
  79. (message) => message.value.content[key] !== undefined,
  80. (err, message) => {
  81. if (err) {
  82. reject(err);
  83. } else {
  84. if (message === null) {
  85. resolve(null);
  86. } else {
  87. resolve(message.value.content[key]);
  88. }
  89. }
  90. }
  91. )
  92. )
  93. );
  94. };
  95. // build a @mentions lookup cache
  96. // ==============================
  97. // one gotcha with ssb-query is: if we add `name: "my name"` to that query below,
  98. // it can trigger a full-scan of the database instead of better query planing
  99. // also doing multiple of those can be very slow (5 to 30s on my machine).
  100. // gotcha two is: there is no way to express (where msg.author == msg.value.content.about) so we need to do it as a pull.filter()
  101. // one drawback: is, it gives us all the about messages from forever, not just the latest
  102. // TODO: an alternative would be using ssb.names if available and just loading this as a fallback
  103. // Two lookup tables to remove old and duplicate names
  104. const feeds_to_name = {};
  105. let all_the_names = {};
  106. let dirty = false; // just stop mindless work (nothing changed) could be smarter thou
  107. let running = false; // don't run twice
  108. // transposeLookupTable flips the lookup around (form feed->name to name->feed)
  109. // and also enhances the entries with image and relationship info
  110. const transposeLookupTable = () => {
  111. if (!dirty) return;
  112. if (running) return;
  113. running = true;
  114. // invalidate old cache
  115. // regenerate a new thing because we don't know which entries will be gone
  116. all_the_names = {};
  117. const allFeeds = Object.keys(feeds_to_name);
  118. console.log(`updating ${allFeeds.length} feeds`);
  119. console.time("transpose-name-index");
  120. const lookups = [];
  121. for (const feed of allFeeds) {
  122. const e = feeds_to_name[feed];
  123. let pair = { feed, name: e.name };
  124. lookups.push(enhanceFeedInfo(pair));
  125. }
  126. // wait for all image and follow lookups
  127. Promise.all(lookups)
  128. .then(() => {
  129. dirty = false; // all updated
  130. running = false;
  131. console.timeEnd("transpose-name-index");
  132. })
  133. .catch((err) => {
  134. running = false;
  135. console.warn("lookup transposition failed:", err);
  136. });
  137. };
  138. // this function adds the avatar image and relationship to the all_the_names lookup table
  139. const enhanceFeedInfo = ({ feed, name }) => {
  140. return new Promise((resolve, reject) => {
  141. getAbout({ feedId: feed, key: "image" })
  142. .then((img) => {
  143. if (
  144. img !== null &&
  145. typeof img !== "string" &&
  146. typeof img === "object" &&
  147. typeof img.link === "string"
  148. ) {
  149. img = img.link;
  150. } else if (img === null) {
  151. img = nullImage; // default empty image if we don't have one
  152. }
  153. models.friend
  154. .getRelationship(feed)
  155. .then((rel) => {
  156. // append and update lookup table
  157. let feeds_named = all_the_names[name] || [];
  158. feeds_named.push({ feed, name, rel, img });
  159. all_the_names[name.toLowerCase()] = feeds_named;
  160. resolve();
  161. // TODO: append if these fail!?
  162. })
  163. .catch(reject);
  164. })
  165. .catch(reject);
  166. });
  167. };
  168. models.about = {
  169. publicWebHosting: async (feedId) => {
  170. const result = await getAbout({
  171. key: "publicWebHosting",
  172. feedId,
  173. });
  174. return result === true;
  175. },
  176. name: async (feedId) => {
  177. if (isPublic && (await models.about.publicWebHosting(feedId)) === false) {
  178. return "Redacted";
  179. }
  180. // TODO: could possibly use all_the_names
  181. return (
  182. (await getAbout({
  183. key: "name",
  184. feedId,
  185. })) || feedId.slice(1, 1 + 8)
  186. ); // First 8 chars of public key
  187. },
  188. named: (name) => {
  189. let found = [];
  190. let matched = Object.keys(all_the_names).filter((n) => {
  191. return n.startsWith(name.toLowerCase());
  192. });
  193. for (const m of matched) {
  194. found = found.concat(all_the_names[m]);
  195. }
  196. return found;
  197. },
  198. image: async (feedId) => {
  199. if (isPublic && (await models.about.publicWebHosting(feedId)) === false) {
  200. return nullImage;
  201. }
  202. const raw = await getAbout({
  203. key: "image",
  204. feedId,
  205. });
  206. if (raw == null || raw.link == null) {
  207. return nullImage;
  208. }
  209. if (typeof raw.link === "string") {
  210. return raw.link;
  211. }
  212. return raw;
  213. },
  214. description: async (feedId) => {
  215. if (isPublic && (await models.about.publicWebHosting(feedId)) === false) {
  216. return "Redacted";
  217. }
  218. const raw =
  219. (await getAbout({
  220. key: "description",
  221. feedId,
  222. })) || "";
  223. return raw;
  224. },
  225. // This needs to run in the background but also needs to be aborted
  226. // in index.js when the server closes. There's also an interval that
  227. // needs to be cleared. TODO: Ensure that there's never more than
  228. // one interval running at a time.
  229. _startNameWarmup() {
  230. const abortable = pullAbortable();
  231. let intervals = [];
  232. cooler.open().then((ssb) => {
  233. console.time("about-name-warmup"); // benchmark the time it takes to stream all existing about messages
  234. pull(
  235. ssb.query.read({
  236. live: true, // keep streaming new messages as they arrive
  237. query: [
  238. {
  239. $filter: {
  240. // all messages of type:about that have a name field that is typeof string
  241. value: {
  242. content: {
  243. type: "about",
  244. name: { $is: "string" },
  245. },
  246. },
  247. },
  248. },
  249. ],
  250. }),
  251. abortable,
  252. pull.filter((msg) => {
  253. // backlog of data is done, only new values from now on
  254. if (msg.sync && msg.sync === true) {
  255. console.timeEnd("about-name-warmup");
  256. transposeLookupTable(); // fire once now
  257. intervals.push(setInterval(transposeLookupTable, 1000 * 60)); // and then every 60 seconds
  258. return false;
  259. }
  260. // only pick messages about self
  261. return msg.value.author == msg.value.content.about;
  262. }),
  263. pull.drain((msg) => {
  264. const name = msg.value.content.name;
  265. const ts = msg.value.timestamp;
  266. const feed = msg.value.author;
  267. const newEntry = { name, ts };
  268. const currentEntry = feeds_to_name[feed];
  269. if (typeof currentEntry == "undefined") {
  270. dirty = true;
  271. feeds_to_name[feed] = newEntry;
  272. } else if (currentEntry.ts < ts) {
  273. // overwrite entry if it's newer
  274. dirty = true;
  275. feeds_to_name[feed] = newEntry;
  276. }
  277. })
  278. );
  279. });
  280. return {
  281. close: () => {
  282. abortable.abort();
  283. intervals.forEach((i) => clearInterval(i));
  284. },
  285. };
  286. },
  287. };
  288. models.blob = {
  289. get: async ({ blobId }) => {
  290. debug("get blob: %s", blobId);
  291. const ssb = await cooler.open();
  292. return ssb.blobs.get(blobId);
  293. },
  294. getResolved: async ({ blobId }) => {
  295. const bufferSource = await models.blob.get({ blobId });
  296. debug("got buffer source");
  297. return new Promise((resolve) => {
  298. pull(
  299. bufferSource,
  300. pull.collect(async (err, bufferArray) => {
  301. if (err) {
  302. await models.blob.want({ blobId });
  303. resolve(Buffer.alloc(0));
  304. } else {
  305. const buffer = Buffer.concat(bufferArray);
  306. resolve(buffer);
  307. }
  308. })
  309. );
  310. });
  311. },
  312. want: async ({ blobId }) => {
  313. debug("want blob: %s", blobId);
  314. cooler
  315. .open()
  316. .then((ssb) => {
  317. // This does not wait for the blob.
  318. ssb.blobs.want(blobId);
  319. })
  320. .catch((err) => {
  321. console.warn(`failed to want blob:${blobId}: ${err}`);
  322. });
  323. },
  324. search: async ({ query }) => {
  325. debug("blob search: %s", query);
  326. const ssb = await cooler.open();
  327. return new Promise((resolve, reject) => {
  328. ssb.meme.search(query, (err, blobs) => {
  329. if (err) return reject(err);
  330. return resolve(blobs);
  331. });
  332. });
  333. },
  334. };
  335. models.friend = {
  336. /** @param {{ feedId: string, following: boolean, blocking: boolean }} input */
  337. setRelationship: async ({ feedId, following, blocking }) => {
  338. if (following && blocking) {
  339. throw new Error("Cannot follow and block at the same time");
  340. }
  341. const current = await models.friend.getRelationship(feedId);
  342. const alreadySet =
  343. current.following === following && current.blocking === blocking;
  344. if (alreadySet) {
  345. // The following state is already set, don't re-set it.
  346. return;
  347. }
  348. const ssb = await cooler.open();
  349. const content = {
  350. type: "contact",
  351. contact: feedId,
  352. following,
  353. blocking,
  354. };
  355. transposeLookupTable(); // invalidate @mentions table
  356. return ssb.publish(content);
  357. },
  358. follow: (feedId) =>
  359. models.friend.setRelationship({
  360. feedId,
  361. following: true,
  362. blocking: false,
  363. }),
  364. unfollow: (feedId) =>
  365. models.friend.setRelationship({
  366. feedId,
  367. following: false,
  368. blocking: false,
  369. }),
  370. block: (feedId) =>
  371. models.friend.setRelationship({
  372. feedId,
  373. blocking: true,
  374. following: false,
  375. }),
  376. unblock: (feedId) =>
  377. models.friend.setRelationship({
  378. feedId,
  379. blocking: false,
  380. following: false,
  381. }),
  382. /**
  383. * @param feedId {string}
  384. * @returns {Promise<{me: boolean, following: boolean, blocking: boolean, followsMe: boolean }>}
  385. */
  386. getRelationship: async (feedId) => {
  387. const ssb = await cooler.open();
  388. const { id } = ssb;
  389. if (feedId === id) {
  390. return {
  391. me: true,
  392. following: false,
  393. blocking: false,
  394. followsMe: false,
  395. };
  396. }
  397. const isFollowing = await ssb.friends.isFollowing({
  398. source: id,
  399. dest: feedId,
  400. });
  401. const isBlocking = await ssb.friends.isBlocking({
  402. source: id,
  403. dest: feedId,
  404. });
  405. const followsMe = await ssb.friends.isFollowing({
  406. source: feedId,
  407. dest: id,
  408. });
  409. return {
  410. me: false,
  411. following: isFollowing,
  412. blocking: isBlocking,
  413. followsMe: followsMe,
  414. };
  415. },
  416. };
  417. models.meta = {
  418. myFeedId: async () => {
  419. const ssb = await cooler.open();
  420. const { id } = ssb;
  421. return id;
  422. },
  423. get: async (msgId) => {
  424. const ssb = await cooler.open();
  425. return ssb.get({
  426. id: msgId,
  427. meta: true,
  428. private: true,
  429. });
  430. },
  431. status: async () => {
  432. const ssb = await cooler.open();
  433. return ssb.status();
  434. },
  435. peers: async () => {
  436. const ssb = await cooler.open();
  437. const peersSource = await ssb.conn.peers();
  438. return new Promise((resolve, reject) => {
  439. pull(
  440. peersSource,
  441. // https://github.com/staltz/ssb-conn/issues/9
  442. pull.take(1),
  443. pull.collect((err, val) => {
  444. if (err) return reject(err);
  445. resolve(val[0]);
  446. })
  447. );
  448. });
  449. },
  450. connectedPeers: async () => {
  451. const peers = await models.meta.peers();
  452. return peers.filter(([address, data]) => {
  453. if (data.state === "connected") {
  454. return [address, data];
  455. }
  456. });
  457. },
  458. connStop: async () => {
  459. const ssb = await cooler.open();
  460. try {
  461. const result = await ssb.conn.stop();
  462. return result;
  463. } catch (e) {
  464. const expectedName = "TypeError";
  465. const expectedMessage = "Cannot read property 'close' of null";
  466. if (e.name === expectedName && e.message === expectedMessage) {
  467. // https://github.com/staltz/ssb-lan/issues/5
  468. debug("ssbConn is already stopped -- caught error");
  469. } else {
  470. throw new Error(e);
  471. }
  472. }
  473. },
  474. connStart: async () => {
  475. const ssb = await cooler.open();
  476. const result = await ssb.conn.start();
  477. return result;
  478. },
  479. connRestart: async () => {
  480. await models.meta.connStop();
  481. await models.meta.connStart();
  482. },
  483. sync: async () => {
  484. const ssb = await cooler.open();
  485. const progress = await ssb.progress();
  486. let previousTarget = progress.indexes.target;
  487. // Automatically timeout after 5 minutes.
  488. let keepGoing = true;
  489. const timeoutInterval = setTimeout(() => {
  490. keepGoing = false;
  491. }, 5 * 60 * 1000);
  492. await ssb.conn.start();
  493. // Promise that resolves the number of new messages after 5 seconds.
  494. const diff = async () =>
  495. new Promise((resolve) => {
  496. setTimeout(async () => {
  497. const currentProgress = await ssb.progress();
  498. const currentTarget = currentProgress.indexes.target;
  499. const difference = currentTarget - previousTarget;
  500. previousTarget = currentTarget;
  501. debug(`Difference: ${difference} bytes`);
  502. resolve(difference);
  503. }, 5000);
  504. });
  505. debug("Starting sync, waiting for new messages...");
  506. // Wait until we **start** receiving messages.
  507. while (keepGoing && (await diff()) === 0) {
  508. debug("Received no new messages.");
  509. }
  510. debug("Finished waiting for first new message.");
  511. // Wait until we **stop** receiving messages.
  512. while (keepGoing && (await diff()) > 0) {
  513. debug(`Still receiving new messages...`);
  514. }
  515. debug("Finished waiting for last new message.");
  516. clearInterval(timeoutInterval);
  517. await ssb.conn.stop();
  518. },
  519. acceptInvite: async (invite) => {
  520. const ssb = await cooler.open();
  521. return await ssb.invite.accept(invite);
  522. },
  523. // Returns promise, does not wait for rebuild to finish.
  524. rebuild: async () => {
  525. const ssb = await cooler.open();
  526. return ssb.rebuild();
  527. },
  528. };
  529. const isLooseRoot = (message) => {
  530. const conditions = [
  531. isPost(message),
  532. hasNoRoot(message),
  533. hasNoFork(message),
  534. ];
  535. return conditions.every((x) => x);
  536. };
  537. const isLooseSubtopic = (message) => {
  538. const conditions = [isPost(message), hasRoot(message), hasFork(message)];
  539. return conditions.every((x) => x);
  540. };
  541. const isLooseComment = (message) => {
  542. const conditions = [isPost(message), hasRoot(message), hasNoFork(message)];
  543. return conditions.every((x) => x === true);
  544. };
  545. const maxMessages = 64;
  546. const getMessages = async ({
  547. myFeedId,
  548. customOptions,
  549. ssb,
  550. query,
  551. filter = null,
  552. }) => {
  553. const options = configure({ query, index: "DTA" }, customOptions);
  554. const source = ssb.backlinks.read(options);
  555. const basicSocialFilter = await socialFilter();
  556. return new Promise((resolve, reject) => {
  557. pull(
  558. source,
  559. basicSocialFilter,
  560. pull.filter(
  561. (msg) =>
  562. isNotEncrypted(msg) &&
  563. isPost(msg) &&
  564. (filter == null || filter(msg) === true)
  565. ),
  566. pull.take(maxMessages),
  567. pull.collect((err, collectedMessages) => {
  568. if (err) {
  569. reject(err);
  570. } else {
  571. resolve(transform(ssb, collectedMessages, myFeedId));
  572. }
  573. })
  574. );
  575. });
  576. };
  577. /**
  578. * Returns a function that filters messages based on who published the message.
  579. *
  580. * `null` means we don't care, `true` means it must be true, and `false` means
  581. * that the value must be false. For example, if you set `me = true` then it
  582. * will only allow messages that are from you. If you set `blocking = true`
  583. * then you only see message from people you block.
  584. */
  585. const socialFilter = async ({
  586. following = null,
  587. blocking = false,
  588. me = null,
  589. } = {}) => {
  590. const ssb = await cooler.open();
  591. const { id } = ssb;
  592. const relationshipObject = await new Promise((resolve, reject) => {
  593. ssb.friends.graph((err, graph) => {
  594. if (err) {
  595. console.error(err);
  596. reject(err);
  597. }
  598. resolve(graph[id] || {});
  599. });
  600. });
  601. const followingList = Object.entries(relationshipObject)
  602. .filter(([, val]) => val >= 0)
  603. .map(([key]) => key);
  604. const blockingList = Object.entries(relationshipObject)
  605. .filter(([, val]) => val === -1)
  606. .map(([key]) => key);
  607. return pull.filter((message) => {
  608. if (message.value.author === id) {
  609. return me !== false;
  610. } else {
  611. return (
  612. (following === null ||
  613. followingList.includes(message.value.author) === following) &&
  614. (blocking === null ||
  615. blockingList.includes(message.value.author) === blocking)
  616. );
  617. }
  618. });
  619. };
  620. const getUserInfo = async (feedId) => {
  621. const pendingName = models.about.name(feedId);
  622. const pendingAvatarMsg = models.about.image(feedId);
  623. const pending = [pendingName, pendingAvatarMsg];
  624. const [name, avatarMsg] = await Promise.all(pending);
  625. const avatarId =
  626. avatarMsg != null && typeof avatarMsg.link === "string"
  627. ? avatarMsg.link || nullImage
  628. : avatarMsg || nullImage;
  629. const avatarUrl = `/image/64/${encodeURIComponent(avatarId)}`;
  630. return { name, feedId, avatarId, avatarUrl };
  631. };
  632. function getRecipientFeedId(recipient) {
  633. if (typeof recipient === "string") {
  634. return recipient;
  635. } else {
  636. return recipient.link;
  637. }
  638. }
  639. const transform = (ssb, messages, myFeedId) =>
  640. Promise.all(
  641. messages.map(async (msg) => {
  642. debug("transforming %s", msg.key);
  643. if (msg == null) {
  644. return null;
  645. }
  646. const filterQuery = {
  647. $filter: {
  648. dest: msg.key,
  649. },
  650. };
  651. const referenceStream = ssb.backlinks.read({
  652. query: [filterQuery],
  653. index: "DTA", // use asserted timestamps
  654. private: true,
  655. meta: true,
  656. });
  657. if (lodash.get(msg, "value.content.type") === "blog") {
  658. const blogTitle = msg.value.content.title;
  659. const blogSummary = lodash.get(msg, "value.content.summary", null);
  660. const blobId = msg.value.content.blog;
  661. const blogContent = await models.blob.getResolved({ blobId });
  662. let textElements = [`# ${blogTitle}`, blogContent];
  663. if (blogSummary) {
  664. textElements.splice(1, 0, `**${blogSummary}**`);
  665. }
  666. lodash.set(msg, "value.content.text", textElements.join("\n\n"));
  667. }
  668. const rawVotes = await new Promise((resolve, reject) => {
  669. pull(
  670. referenceStream,
  671. pull.filter(
  672. (ref) =>
  673. isNotEncrypted(ref) &&
  674. ref.value.content.type === "vote" &&
  675. ref.value.content.vote &&
  676. typeof ref.value.content.vote.value === "number" &&
  677. ref.value.content.vote.value >= 0 &&
  678. ref.value.content.vote.link === msg.key
  679. ),
  680. pull.collect((err, collectedMessages) => {
  681. if (err) {
  682. reject(err);
  683. } else {
  684. resolve(collectedMessages);
  685. }
  686. })
  687. );
  688. });
  689. // { @key: 1, @key2: 0, @key3: 1 }
  690. //
  691. // only one vote per person!
  692. const reducedVotes = rawVotes.reduce((acc, vote) => {
  693. acc[vote.value.author] = vote.value.content.vote.value;
  694. return acc;
  695. }, {});
  696. // gets *only* the people who voted 1
  697. // [ @key, @key, @key ]
  698. const voters = Object.entries(reducedVotes)
  699. .filter(([, value]) => value === 1)
  700. .map(([key]) => key);
  701. // get an array of voter names, for display on hovers
  702. const pendingVoterNames = voters.map(async (author) => ({
  703. name: await models.about.name(author),
  704. key: author,
  705. }));
  706. const voterNames = await Promise.all(pendingVoterNames);
  707. const { name, avatarId, avatarUrl } = await getUserInfo(
  708. msg.value.author
  709. );
  710. if (isPublic) {
  711. const publicOptIn = await models.about.publicWebHosting(
  712. msg.value.author
  713. );
  714. if (publicOptIn === false) {
  715. lodash.set(
  716. msg,
  717. "value.content.text",
  718. "This is a public message that has been redacted because Oasis is running in public mode. This redaction is only meant to make Oasis consistent with other public SSB viewers. Please do not mistake this for privacy. All public messages are public. Any peer on the SSB network can see this message."
  719. );
  720. if (msg.value.content.contentWarning != null) {
  721. msg.value.content.contentWarning = "Redacted";
  722. }
  723. }
  724. }
  725. const channel = lodash.get(msg, "value.content.channel");
  726. const hasChannel = typeof channel === "string" && channel.length > 2;
  727. if (hasChannel && hasNoRoot(msg)) {
  728. msg.value.content.text += `\n\n#${channel}`;
  729. }
  730. const ts = new Date(msg.value.timestamp);
  731. let isoTs;
  732. try {
  733. isoTs = ts.toISOString();
  734. } catch (e) {
  735. // Just in case it's an invalid date. :(
  736. debug(e);
  737. const receivedTs = new Date(msg.timestamp);
  738. isoTs = receivedTs.toISOString();
  739. }
  740. lodash.set(msg, "value.meta.timestamp.received.iso8601", isoTs);
  741. const ago = Date.now() - Number(ts);
  742. const prettyAgo = prettyMs(ago, { compact: true });
  743. lodash.set(msg, "value.meta.timestamp.received.since", prettyAgo);
  744. lodash.set(msg, "value.meta.author.name", name);
  745. lodash.set(msg, "value.meta.author.avatar", {
  746. id: avatarId,
  747. url: avatarUrl,
  748. });
  749. if (isTextLike(msg) && hasNoRoot(msg) && hasNoFork(msg)) {
  750. lodash.set(msg, "value.meta.postType", "post");
  751. } else if (isTextLike(msg) && hasRoot(msg) && hasNoFork(msg)) {
  752. lodash.set(msg, "value.meta.postType", "comment");
  753. } else if (isTextLike(msg) && hasRoot(msg) && hasFork(msg)) {
  754. lodash.set(msg, "value.meta.postType", "subtopic");
  755. } else {
  756. lodash.set(msg, "value.meta.postType", "mystery");
  757. }
  758. lodash.set(msg, "value.meta.votes", voterNames);
  759. lodash.set(msg, "value.meta.voted", voters.includes(myFeedId));
  760. if (isPrivate(msg)) {
  761. msg.value.meta.recpsInfo = await Promise.all(
  762. msg.value.content.recps.map((recipient) => {
  763. return getUserInfo(getRecipientFeedId(recipient));
  764. })
  765. );
  766. }
  767. const { blocking } = await models.friend.getRelationship(
  768. msg.value.author
  769. );
  770. lodash.set(msg, "value.meta.blocking", blocking);
  771. return msg;
  772. })
  773. );
  774. const getLimitPost = async (feedId, reverse) => {
  775. const ssb = await cooler.open();
  776. const source = ssb.createUserStream({ id: feedId, reverse: reverse });
  777. const messages = await new Promise((resolve, reject) => {
  778. pull(
  779. source,
  780. pull.filter((msg) => isDecrypted(msg) === false && isPost(msg)),
  781. pull.take(1),
  782. pull.collect((err, collectedMessages) => {
  783. if (err) {
  784. reject(err);
  785. } else {
  786. resolve(transform(ssb, collectedMessages, feedId));
  787. }
  788. })
  789. );
  790. });
  791. return messages.length ? messages[0] : undefined;
  792. };
  793. const post = {
  794. firstBy: async (feedId) => {
  795. return getLimitPost(feedId, false);
  796. },
  797. latestBy: async (feedId) => {
  798. return getLimitPost(feedId, true);
  799. },
  800. fromPublicFeed: async (feedId, gt = -1, lt = -1, customOptions = {}) => {
  801. const ssb = await cooler.open();
  802. const myFeedId = ssb.id;
  803. let defaultOptions = { id: feedId };
  804. if (lt >= 0) defaultOptions.lt = lt;
  805. if (gt >= 0) defaultOptions.gt = gt;
  806. defaultOptions.reverse = !(gt >= 0 && lt < 0);
  807. const options = configure(defaultOptions, customOptions);
  808. const { blocking } = await models.friend.getRelationship(feedId);
  809. // Avoid streaming any messages from this feed. If we used the social
  810. // filter here it would continue streaming all messages from this author
  811. // until it consumed the entire feed.
  812. if (blocking) {
  813. return [];
  814. }
  815. const source = ssb.createUserStream(options);
  816. const messages = await new Promise((resolve, reject) => {
  817. pull(
  818. source,
  819. pull.filter((msg) => isDecrypted(msg) === false && isTextLike(msg)),
  820. pull.take(maxMessages),
  821. pull.collect((err, collectedMessages) => {
  822. if (err) {
  823. reject(err);
  824. } else {
  825. resolve(transform(ssb, collectedMessages, myFeedId));
  826. }
  827. })
  828. );
  829. });
  830. if (!defaultOptions.reverse) return messages.reverse();
  831. else return messages;
  832. },
  833. mentionsMe: async (customOptions = {}) => {
  834. const ssb = await cooler.open();
  835. const myFeedId = ssb.id;
  836. const query = [
  837. {
  838. $filter: {
  839. dest: myFeedId,
  840. },
  841. },
  842. ];
  843. const messages = await getMessages({
  844. myFeedId,
  845. customOptions,
  846. ssb,
  847. query,
  848. filter: (msg) =>
  849. msg.value.author !== myFeedId &&
  850. lodash.get(msg, "value.meta.private") !== true,
  851. });
  852. return messages;
  853. },
  854. fromHashtag: async (hashtag, customOptions = {}) => {
  855. const ssb = await cooler.open();
  856. const myFeedId = ssb.id;
  857. const query = [
  858. {
  859. $filter: {
  860. dest: `#${hashtag}`,
  861. },
  862. },
  863. ];
  864. const messages = await getMessages({
  865. myFeedId,
  866. customOptions,
  867. ssb,
  868. query,
  869. });
  870. return messages;
  871. },
  872. topicComments: async (rootId, customOptions = {}) => {
  873. const ssb = await cooler.open();
  874. const myFeedId = ssb.id;
  875. const query = [
  876. {
  877. $filter: {
  878. dest: rootId,
  879. },
  880. },
  881. ];
  882. const messages = await getMessages({
  883. myFeedId,
  884. customOptions,
  885. ssb,
  886. query,
  887. filter: (msg) => msg.value.content.root === rootId && hasNoFork(msg),
  888. });
  889. return messages;
  890. },
  891. likes: async ({ feed }, customOptions = {}) => {
  892. const ssb = await cooler.open();
  893. const query = [
  894. {
  895. $filter: {
  896. value: {
  897. author: feed,
  898. timestamp: { $lte: Date.now() },
  899. content: {
  900. type: "vote",
  901. },
  902. },
  903. },
  904. },
  905. ];
  906. const options = configure(
  907. {
  908. query,
  909. reverse: true,
  910. },
  911. customOptions
  912. );
  913. const source = await ssb.query.read(options);
  914. const messages = await new Promise((resolve, reject) => {
  915. pull(
  916. source,
  917. pull.filter((msg) => {
  918. return (
  919. isNotEncrypted(msg) &&
  920. msg.value.author === feed &&
  921. typeof msg.value.content.vote === "object" &&
  922. typeof msg.value.content.vote.link === "string"
  923. );
  924. }),
  925. pull.take(maxMessages),
  926. pull.unique((message) => message.value.content.vote.link),
  927. pullParallelMap(async (val, cb) => {
  928. const msg = await post.get(val.value.content.vote.link);
  929. cb(null, msg);
  930. }),
  931. pull.filter((message) =>
  932. message.value.meta.votes.map((voter) => voter.key).includes(feed)
  933. ),
  934. pull.collect((err, collectedMessages) => {
  935. if (err) {
  936. reject(err);
  937. } else {
  938. resolve(collectedMessages);
  939. }
  940. })
  941. );
  942. });
  943. return messages;
  944. },
  945. search: async ({ query }) => {
  946. const ssb = await cooler.open();
  947. const myFeedId = ssb.id;
  948. const options = configure({
  949. query,
  950. });
  951. const source = await ssb.search.query(options);
  952. const basicSocialFilter = await socialFilter();
  953. const messages = await new Promise((resolve, reject) => {
  954. pull(
  955. source,
  956. basicSocialFilter,
  957. pull.filter(isNotPrivate),
  958. pull.take(maxMessages),
  959. pull.collect((err, collectedMessages) => {
  960. if (err) {
  961. reject(err);
  962. } else {
  963. resolve(transform(ssb, collectedMessages, myFeedId));
  964. }
  965. })
  966. );
  967. });
  968. return messages;
  969. },
  970. latest: async () => {
  971. const ssb = await cooler.open();
  972. const myFeedId = ssb.id;
  973. const source = ssb.query.read(
  974. configure({
  975. query: [
  976. {
  977. $filter: {
  978. value: {
  979. timestamp: { $lte: Date.now() },
  980. content: {
  981. type: { $in: ["post", "blog"] },
  982. },
  983. },
  984. },
  985. },
  986. ],
  987. })
  988. );
  989. const followingFilter = await socialFilter({ following: true });
  990. const messages = await new Promise((resolve, reject) => {
  991. pull(
  992. source,
  993. followingFilter,
  994. publicOnlyFilter,
  995. pull.take(maxMessages),
  996. pull.collect((err, collectedMessages) => {
  997. if (err) {
  998. reject(err);
  999. } else {
  1000. resolve(transform(ssb, collectedMessages, myFeedId));
  1001. }
  1002. })
  1003. );
  1004. });
  1005. return messages;
  1006. },
  1007. latestExtended: async () => {
  1008. const ssb = await cooler.open();
  1009. const myFeedId = ssb.id;
  1010. const source = ssb.query.read(
  1011. configure({
  1012. query: [
  1013. {
  1014. $filter: {
  1015. value: {
  1016. timestamp: { $lte: Date.now() },
  1017. content: {
  1018. type: { $in: ["post", "blog"] },
  1019. },
  1020. },
  1021. },
  1022. },
  1023. ],
  1024. })
  1025. );
  1026. const extendedFilter = await socialFilter({
  1027. following: false,
  1028. me: false,
  1029. });
  1030. const messages = await new Promise((resolve, reject) => {
  1031. pull(
  1032. source,
  1033. publicOnlyFilter,
  1034. extendedFilter,
  1035. pull.take(maxMessages),
  1036. pull.collect((err, collectedMessages) => {
  1037. if (err) {
  1038. reject(err);
  1039. } else {
  1040. resolve(transform(ssb, collectedMessages, myFeedId));
  1041. }
  1042. })
  1043. );
  1044. });
  1045. return messages;
  1046. },
  1047. latestTopics: async () => {
  1048. const ssb = await cooler.open();
  1049. const myFeedId = ssb.id;
  1050. const source = ssb.query.read(
  1051. configure({
  1052. query: [
  1053. {
  1054. $filter: {
  1055. value: {
  1056. timestamp: { $lte: Date.now() },
  1057. content: {
  1058. type: { $in: ["post", "blog"] },
  1059. },
  1060. },
  1061. },
  1062. },
  1063. ],
  1064. })
  1065. );
  1066. const extendedFilter = await socialFilter({
  1067. following: true,
  1068. });
  1069. const messages = await new Promise((resolve, reject) => {
  1070. pull(
  1071. source,
  1072. publicOnlyFilter,
  1073. pull.filter(hasNoRoot),
  1074. extendedFilter,
  1075. pull.take(maxMessages),
  1076. pull.collect((err, collectedMessages) => {
  1077. if (err) {
  1078. reject(err);
  1079. } else {
  1080. resolve(transform(ssb, collectedMessages, myFeedId));
  1081. }
  1082. })
  1083. );
  1084. });
  1085. return messages;
  1086. },
  1087. latestSummaries: async () => {
  1088. const ssb = await cooler.open();
  1089. const myFeedId = ssb.id;
  1090. const options = configure({
  1091. type: "post",
  1092. private: false,
  1093. });
  1094. const source = ssb.messagesByType(options);
  1095. const extendedFilter = await socialFilter({
  1096. following: true,
  1097. });
  1098. const messages = await new Promise((resolve, reject) => {
  1099. pull(
  1100. source,
  1101. pull.filter((message) => isNotPrivate(message) && hasNoRoot(message)),
  1102. extendedFilter,
  1103. pull.take(maxMessages),
  1104. pullParallelMap(async (message, cb) => {
  1105. // Retrieve a preview of this post's comments / thread
  1106. const thread = await post.fromThread(message.key);
  1107. lodash.set(
  1108. message,
  1109. "value.meta.thread",
  1110. await transform(ssb, thread, myFeedId)
  1111. );
  1112. cb(null, message);
  1113. }),
  1114. pull.collect((err, collectedMessages) => {
  1115. if (err) {
  1116. reject(err);
  1117. } else {
  1118. resolve(transform(ssb, collectedMessages, myFeedId));
  1119. }
  1120. })
  1121. );
  1122. });
  1123. return messages;
  1124. },
  1125. latestThreads: async () => {
  1126. const ssb = await cooler.open();
  1127. const myFeedId = ssb.id;
  1128. const source = ssb.query.read(
  1129. configure({
  1130. query: [
  1131. {
  1132. $filter: {
  1133. value: {
  1134. timestamp: { $lte: Date.now() },
  1135. content: {
  1136. type: { $in: ["post", "blog"] },
  1137. },
  1138. },
  1139. },
  1140. },
  1141. ],
  1142. })
  1143. );
  1144. const basicSocialFilter = await socialFilter();
  1145. const messages = await new Promise((resolve, reject) => {
  1146. pull(
  1147. source,
  1148. basicSocialFilter,
  1149. pull.filter((message) => isNotPrivate(message) && hasNoRoot(message)),
  1150. pull.take(maxMessages),
  1151. pullParallelMap(async (message, cb) => {
  1152. // Retrieve a preview of this post's comments / thread
  1153. const thread = await post.fromThread(message.key);
  1154. lodash.set(
  1155. message,
  1156. "value.meta.thread",
  1157. await transform(ssb, thread, myFeedId)
  1158. );
  1159. cb(null, message);
  1160. }),
  1161. pull.filter((message) => message.value.meta.thread.length > 1),
  1162. pull.collect((err, collectedMessages) => {
  1163. if (err) {
  1164. reject(err);
  1165. } else {
  1166. resolve(transform(ssb, collectedMessages, myFeedId));
  1167. }
  1168. })
  1169. );
  1170. });
  1171. return messages;
  1172. },
  1173. popular: async ({ period }) => {
  1174. const ssb = await cooler.open();
  1175. const periodDict = {
  1176. day: 1,
  1177. week: 7,
  1178. month: 30.42,
  1179. year: 365,
  1180. };
  1181. if (period in periodDict === false) {
  1182. throw new Error("invalid period");
  1183. }
  1184. const myFeedId = ssb.id;
  1185. const now = new Date();
  1186. const earliest = Number(now) - 1000 * 60 * 60 * 24 * periodDict[period];
  1187. const source = ssb.query.read(
  1188. configure({
  1189. query: [
  1190. {
  1191. $filter: {
  1192. value: {
  1193. timestamp: { $gte: earliest },
  1194. content: {
  1195. type: "vote",
  1196. },
  1197. },
  1198. },
  1199. },
  1200. ],
  1201. })
  1202. );
  1203. const basicSocialFilter = await socialFilter();
  1204. const messages = await new Promise((resolve, reject) => {
  1205. pull(
  1206. source,
  1207. publicOnlyFilter,
  1208. pull.filter((msg) => {
  1209. return (
  1210. isNotEncrypted(msg) &&
  1211. typeof msg.value.content.vote === "object" &&
  1212. typeof msg.value.content.vote.link === "string" &&
  1213. typeof msg.value.content.vote.value === "number"
  1214. );
  1215. }),
  1216. pull.reduce(
  1217. (acc, cur) => {
  1218. const author = cur.value.author;
  1219. const target = cur.value.content.vote.link;
  1220. const value = cur.value.content.vote.value;
  1221. if (acc[author] == null) {
  1222. acc[author] = {};
  1223. }
  1224. // Only accept values between -1 and 1
  1225. acc[author][target] = Math.max(-1, Math.min(1, value));
  1226. return acc;
  1227. },
  1228. {},
  1229. (err, obj) => {
  1230. if (err) {
  1231. return reject(err);
  1232. }
  1233. // HACK: Can we do this without a reduce()? I think this makes the
  1234. // stream much slower than it needs to be. Also, we should probably
  1235. // be indexing these rather than building the stream on refresh.
  1236. const adjustedObj = Object.entries(obj).reduce(
  1237. (acc, [author, values]) => {
  1238. if (author === myFeedId) {
  1239. return acc;
  1240. }
  1241. // The value of a users vote is 1 / (1 + total votes), the
  1242. // more a user votes, the less weight is given to each vote.
  1243. const entries = Object.entries(values);
  1244. const total = 1 + Math.log(entries.length);
  1245. entries.forEach(([link, value]) => {
  1246. if (acc[link] == null) {
  1247. acc[link] = 0;
  1248. }
  1249. acc[link] += value / total;
  1250. });
  1251. return acc;
  1252. },
  1253. []
  1254. );
  1255. const arr = Object.entries(adjustedObj);
  1256. const length = arr.length;
  1257. pull(
  1258. pull.values(arr),
  1259. pullSort(([, aVal], [, bVal]) => bVal - aVal),
  1260. pull.take(Math.min(length, maxMessages)),
  1261. pull.map(([key]) => key),
  1262. pullParallelMap(async (key, cb) => {
  1263. try {
  1264. const msg = await post.get(key);
  1265. cb(null, msg);
  1266. } catch (e) {
  1267. cb(null, null);
  1268. }
  1269. }),
  1270. // avoid private messages (!) and non-posts
  1271. pull.filter(
  1272. (message) =>
  1273. message &&
  1274. isNotPrivate(message) &&
  1275. (message.value.content.type === "post" ||
  1276. message.value.content.type === "blog")
  1277. ),
  1278. basicSocialFilter,
  1279. pull.collect((collectErr, collectedMessages) => {
  1280. if (collectErr) {
  1281. reject(collectErr);
  1282. } else {
  1283. resolve(collectedMessages);
  1284. }
  1285. })
  1286. );
  1287. }
  1288. )
  1289. );
  1290. });
  1291. return messages;
  1292. },
  1293. fromThread: async (msgId, customOptions) => {
  1294. debug("thread: %s", msgId);
  1295. const ssb = await cooler.open();
  1296. const myFeedId = ssb.id;
  1297. const options = configure({ id: msgId }, customOptions);
  1298. return ssb
  1299. .get(options)
  1300. .then(async (rawMsg) => {
  1301. debug("got raw message");
  1302. const parents = [];
  1303. const getRootAncestor = (msg) =>
  1304. new Promise((resolve, reject) => {
  1305. if (msg.key == null) {
  1306. debug("something is very wrong, we used `{ meta: true }`");
  1307. resolve(parents);
  1308. } else {
  1309. debug("getting root ancestor of %s", msg.key);
  1310. if (isEncrypted(msg)) {
  1311. // Private message we can't decrypt, stop looking for parents.
  1312. debug("private message");
  1313. if (parents.length > 0) {
  1314. // If we already have some parents, return those.
  1315. resolve(parents);
  1316. } else {
  1317. // If we don't know of any parents, resolve this message.
  1318. resolve(msg);
  1319. }
  1320. } else if (msg.value.content.type !== "post") {
  1321. debug("not a post");
  1322. resolve(msg);
  1323. } else if (
  1324. isLooseSubtopic(msg) &&
  1325. ssbRef.isMsg(msg.value.content.fork)
  1326. ) {
  1327. debug("subtopic, get the parent");
  1328. try {
  1329. // It's a subtopic, get the parent!
  1330. ssb
  1331. .get({
  1332. id: msg.value.content.fork,
  1333. meta: true,
  1334. private: true,
  1335. })
  1336. .then((fork) => {
  1337. resolve(getRootAncestor(fork));
  1338. })
  1339. .catch(reject);
  1340. } catch (e) {
  1341. debug(e);
  1342. resolve(msg);
  1343. }
  1344. } else if (
  1345. isLooseComment(msg) &&
  1346. ssbRef.isMsg(msg.value.content.root)
  1347. ) {
  1348. debug("comment: %s", msg.value.content.root);
  1349. try {
  1350. // It's a thread subtopic, get the parent!
  1351. ssb
  1352. .get({
  1353. id: msg.value.content.root,
  1354. meta: true,
  1355. private: true,
  1356. })
  1357. .then((root) => {
  1358. resolve(getRootAncestor(root));
  1359. })
  1360. .catch(reject);
  1361. } catch (e) {
  1362. debug(e);
  1363. resolve(msg);
  1364. }
  1365. } else if (isLooseRoot(msg)) {
  1366. debug("got root ancestor");
  1367. resolve(msg);
  1368. } else {
  1369. // type !== "post", probably
  1370. // this should show up as JSON
  1371. debug(
  1372. "got mysterious root ancestor that fails all known schemas"
  1373. );
  1374. debug("%O", msg);
  1375. resolve(msg);
  1376. }
  1377. }
  1378. });
  1379. const getDirectDescendants = (key) =>
  1380. new Promise((resolve, reject) => {
  1381. const filterQuery = {
  1382. $filter: {
  1383. dest: key,
  1384. },
  1385. };
  1386. const referenceStream = ssb.backlinks.read({
  1387. query: [filterQuery],
  1388. index: "DTA", // use asserted timestamps
  1389. });
  1390. pull(
  1391. referenceStream,
  1392. pull.filter((msg) => {
  1393. if (isTextLike(msg) === false) {
  1394. return false;
  1395. }
  1396. const root = lodash.get(msg, "value.content.root");
  1397. const fork = lodash.get(msg, "value.content.fork");
  1398. if (root !== key && fork !== key) {
  1399. // mention
  1400. return false;
  1401. }
  1402. if (fork === key) {
  1403. // not a subtopic of this post
  1404. // it's a subtopic **of a subtopic** of this post
  1405. return false;
  1406. }
  1407. return true;
  1408. }),
  1409. pull.collect((err, messages) => {
  1410. if (err) {
  1411. reject(err);
  1412. } else {
  1413. resolve(messages || undefined);
  1414. }
  1415. })
  1416. );
  1417. });
  1418. // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/flat
  1419. const flattenDeep = (arr1) =>
  1420. arr1.reduce(
  1421. (acc, val) =>
  1422. Array.isArray(val)
  1423. ? acc.concat(flattenDeep(val))
  1424. : acc.concat(val),
  1425. []
  1426. );
  1427. const getDeepDescendants = (key) =>
  1428. new Promise((resolve, reject) => {
  1429. const oneDeeper = async (descendantKey, depth) => {
  1430. const descendants = await getDirectDescendants(descendantKey);
  1431. if (descendants.length === 0) {
  1432. return descendants;
  1433. }
  1434. return Promise.all(
  1435. descendants.map(async (descendant) => {
  1436. const deeperDescendants = await oneDeeper(
  1437. descendant.key,
  1438. depth + 1
  1439. );
  1440. lodash.set(descendant, "value.meta.thread.depth", depth);
  1441. lodash.set(descendant, "value.meta.thread.subtopic", true);
  1442. return [descendant, deeperDescendants];
  1443. })
  1444. );
  1445. };
  1446. oneDeeper(key, 0)
  1447. .then((nested) => {
  1448. const nestedDescendants = [...nested];
  1449. const deepDescendants = flattenDeep(nestedDescendants);
  1450. resolve(deepDescendants);
  1451. })
  1452. .catch(reject);
  1453. });
  1454. const rootAncestor = await getRootAncestor(rawMsg);
  1455. const deepDescendants = await getDeepDescendants(rootAncestor.key);
  1456. const allMessages = [rootAncestor, ...deepDescendants].map(
  1457. (message) => {
  1458. const isThreadTarget = message.key === msgId;
  1459. lodash.set(message, "value.meta.thread.target", isThreadTarget);
  1460. return message;
  1461. }
  1462. );
  1463. return await transform(ssb, allMessages, myFeedId);
  1464. })
  1465. .catch((err) => {
  1466. if (err.name === "NotFoundError") {
  1467. throw new Error(
  1468. "Message not found in the database. You've done nothing wrong. Maybe try again later?"
  1469. );
  1470. } else {
  1471. throw err;
  1472. }
  1473. });
  1474. },
  1475. get: async (msgId, customOptions) => {
  1476. debug("get: %s", msgId);
  1477. const ssb = await cooler.open();
  1478. const myFeedId = ssb.id;
  1479. const options = configure({ id: msgId }, customOptions);
  1480. const rawMsg = await ssb.get(options);
  1481. debug("got raw message");
  1482. const transformed = await transform(ssb, [rawMsg], myFeedId);
  1483. debug("transformed: %O", transformed);
  1484. return transformed[0];
  1485. },
  1486. publish: async (options) => {
  1487. const ssb = await cooler.open();
  1488. const body = { type: "post", ...options };
  1489. debug("Published: %O", body);
  1490. return ssb.publish(body);
  1491. },
  1492. publishProfileEdit: async ({ name, description, image }) => {
  1493. const ssb = await cooler.open();
  1494. if (image.length > 0) {
  1495. // 5 MiB check
  1496. const mebibyte = Math.pow(2, 20);
  1497. const maxSize = 5 * mebibyte;
  1498. if (image.length > maxSize) {
  1499. throw new Error("Image file is too big, maximum size is 5 mebibytes");
  1500. }
  1501. return new Promise((resolve, reject) => {
  1502. pull(
  1503. pull.values([image]),
  1504. ssb.blobs.add((err, blobId) => {
  1505. if (err) {
  1506. reject(err);
  1507. } else {
  1508. const content = {
  1509. type: "about",
  1510. about: ssb.id,
  1511. name,
  1512. description,
  1513. image: blobId,
  1514. };
  1515. debug("Published: %O", content);
  1516. resolve(ssb.publish(content));
  1517. }
  1518. })
  1519. );
  1520. });
  1521. } else {
  1522. const body = { type: "about", about: ssb.id, name, description };
  1523. debug("Published: %O", body);
  1524. return ssb.publish(body);
  1525. }
  1526. },
  1527. publishCustom: async (options) => {
  1528. const ssb = await cooler.open();
  1529. debug("Published: %O", options);
  1530. return ssb.publish(options);
  1531. },
  1532. subtopic: async ({ parent, message }) => {
  1533. message.root = parent.key;
  1534. message.fork = lodash.get(parent, "value.content.root");
  1535. message.branch = await post.branch({ root: parent.key });
  1536. message.type = "post"; // redundant but used for validation
  1537. if (isSubtopic(message) !== true) {
  1538. const messageString = JSON.stringify(message, null, 2);
  1539. throw new Error(`message should be valid subtopic: ${messageString}`);
  1540. }
  1541. return post.publish(message);
  1542. },
  1543. root: async (options) => {
  1544. const message = { type: "post", ...options };
  1545. if (isRoot(message) !== true) {
  1546. const messageString = JSON.stringify(message, null, 2);
  1547. throw new Error(`message should be valid root post: ${messageString}`);
  1548. }
  1549. return post.publish(message);
  1550. },
  1551. comment: async ({ parent, message }) => {
  1552. // Set `root` to `parent`'s root.
  1553. // If `parent` doesn't have a root, use the parent's key.
  1554. // If `parent` has a fork, you must use the parent's key.
  1555. const parentKey = parent.key;
  1556. const parentFork = lodash.get(parent, "value.content.fork");
  1557. const parentRoot = lodash.get(parent, "value.content.root", parentKey);
  1558. if (isDecrypted(parent)) {
  1559. message.recps = lodash
  1560. .get(parent, "value.content.recps", [])
  1561. .map((recipient) => {
  1562. if (
  1563. typeof recipient === "object" &&
  1564. typeof recipient.link === "string" &&
  1565. recipient.link.length
  1566. ) {
  1567. // Some interfaces, like Patchbay, put `{ name, link }` objects in
  1568. // `recps`. The comment schema says this is invalid, so we want to
  1569. // fix the `recps` before publishing.
  1570. return recipient.link;
  1571. } else {
  1572. return recipient;
  1573. }
  1574. });
  1575. if (message.recps.length === 0) {
  1576. throw new Error("Refusing to publish message with no recipients");
  1577. }
  1578. }
  1579. const parentHasFork = parentFork != null;
  1580. message.root = parentHasFork ? parentKey : parentRoot;
  1581. message.branch = await post.branch({ root: parent.key });
  1582. message.type = "post"; // redundant but used for validation
  1583. if (isComment(message) !== true) {
  1584. const messageString = JSON.stringify(message, null, 2);
  1585. throw new Error(`message should be valid comment: ${messageString}`);
  1586. }
  1587. return post.publish(message);
  1588. },
  1589. branch: async ({ root }) => {
  1590. const ssb = await cooler.open();
  1591. const keys = await ssb.tangle.branch(root);
  1592. return keys;
  1593. },
  1594. channels: async () => {
  1595. const ssb = await cooler.open();
  1596. const source = ssb.createUserStream({ id: ssb.id });
  1597. const messages = await new Promise((resolve, reject) => {
  1598. pull(
  1599. source,
  1600. pull.filter((message) => {
  1601. return lodash.get(message, "value.content.type") === "channel"
  1602. ? true
  1603. : false;
  1604. }),
  1605. pull.collect((err, collectedMessages) => {
  1606. if (err) {
  1607. reject(err);
  1608. } else {
  1609. resolve(transform(ssb, collectedMessages, ssb.id));
  1610. }
  1611. })
  1612. );
  1613. });
  1614. const channels = messages.map((msg) => {
  1615. return {
  1616. channel: msg.value.content.channel,
  1617. subscribed: msg.value.content.subscribed,
  1618. };
  1619. });
  1620. let subbedChannels = [];
  1621. channels.forEach((ch) => {
  1622. if (ch.subscribed && !subbedChannels.includes(ch.channel)) {
  1623. subbedChannels.push(ch.channel);
  1624. }
  1625. if (ch.subscribed === false && subbedChannels.includes(ch.channel)) {
  1626. subbedChannels = lodash.pull(subbedChannels, ch.channel);
  1627. }
  1628. });
  1629. return subbedChannels;
  1630. },
  1631. inbox: async (customOptions = {}) => {
  1632. const ssb = await cooler.open();
  1633. const myFeedId = ssb.id;
  1634. const options = configure(
  1635. {
  1636. query: [{ $filter: { dest: ssb.id } }],
  1637. },
  1638. customOptions
  1639. );
  1640. const source = ssb.backlinks.read(options);
  1641. const messages = await new Promise((resolve, reject) => {
  1642. pull(
  1643. source,
  1644. // Make sure we're only getting private messages that are posts.
  1645. pull.filter(
  1646. (message) =>
  1647. isDecrypted(message) &&
  1648. (lodash.get(message, "value.content.type") === "post" ||
  1649. lodash.get(message, "value.content.type") === "blog")
  1650. ),
  1651. pull.unique((message) => {
  1652. const { root } = message.value.content;
  1653. if (root == null) {
  1654. return message.key;
  1655. } else {
  1656. return root;
  1657. }
  1658. }),
  1659. pull.take(maxMessages),
  1660. pull.collect((err, collectedMessages) => {
  1661. if (err) {
  1662. reject(err);
  1663. } else {
  1664. resolve(transform(ssb, collectedMessages, myFeedId));
  1665. }
  1666. })
  1667. );
  1668. });
  1669. return messages;
  1670. },
  1671. };
  1672. models.post = post;
  1673. models.vote = {
  1674. /** @param {{messageKey: string, value: {}, recps: []}} input */
  1675. publish: async ({ messageKey, value, recps }) => {
  1676. const ssb = await cooler.open();
  1677. const branch = await ssb.tangle.branch(messageKey);
  1678. await ssb.publish({
  1679. type: "vote",
  1680. vote: {
  1681. link: messageKey,
  1682. value: Number(value),
  1683. },
  1684. branch,
  1685. recps,
  1686. });
  1687. },
  1688. };
  1689. return models;
  1690. };