DatabaseQueue.php 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. <?php
  2. namespace Elgg\Queue;
  3. /**
  4. * FIFO queue that uses the database for persistence
  5. *
  6. * WARNING: API IN FLUX. DO NOT USE DIRECTLY.
  7. *
  8. * @access private
  9. *
  10. * @package Elgg.Core
  11. * @subpackage Queue
  12. * @since 1.9.0
  13. */
  14. class DatabaseQueue implements \Elgg\Queue\Queue {
  15. /** @var string Name of the queue */
  16. protected $name;
  17. /** @var \Elgg\Database Database adapter */
  18. protected $db;
  19. /** @var string The identifier of the worker pulling from the queue */
  20. protected $workerId;
  21. /**
  22. * Create a queue
  23. *
  24. * @param string $name Name of the queue. Must be less than 256 characters.
  25. * @param \Elgg\Database $db Database adapter
  26. */
  27. public function __construct($name, \Elgg\Database $db) {
  28. $this->db = $db;
  29. $this->name = $this->db->sanitizeString($name);
  30. $this->workerId = $this->db->sanitizeString(md5(microtime() . getmypid()));
  31. }
  32. /**
  33. * {@inheritdoc}
  34. */
  35. public function enqueue($item) {
  36. $prefix = $this->db->getTablePrefix();
  37. $blob = $this->db->sanitizeString(serialize($item));
  38. $time = time();
  39. $query = "INSERT INTO {$prefix}queue
  40. SET name = '$this->name', data = '$blob', timestamp = $time";
  41. return $this->db->insertData($query) !== false;
  42. }
  43. /**
  44. * {@inheritdoc}
  45. */
  46. public function dequeue() {
  47. $prefix = $this->db->getTablePrefix();
  48. $update = "UPDATE {$prefix}queue
  49. SET worker = '$this->workerId'
  50. WHERE name = '$this->name' AND worker IS NULL
  51. ORDER BY id ASC LIMIT 1";
  52. $num = $this->db->updateData($update, true);
  53. if ($num === 1) {
  54. $select = "SELECT data FROM {$prefix}queue
  55. WHERE worker = '$this->workerId'";
  56. $obj = $this->db->getDataRow($select);
  57. if ($obj) {
  58. $data = unserialize($obj->data);
  59. $delete = "DELETE FROM {$prefix}queue
  60. WHERE name = '$this->name' AND worker = '$this->workerId'";
  61. $this->db->deleteData($delete);
  62. return $data;
  63. }
  64. }
  65. return null;
  66. }
  67. /**
  68. * {@inheritdoc}
  69. */
  70. public function clear() {
  71. $prefix = $this->db->getTablePrefix();
  72. $this->db->deleteData("DELETE FROM {$prefix}queue WHERE name = '$this->name'");
  73. }
  74. /**
  75. * {@inheritdoc}
  76. */
  77. public function size() {
  78. $prefix = $this->db->getTablePrefix();
  79. $result = $this->db->getDataRow("SELECT COUNT(id) AS total FROM {$prefix}queue WHERE name = '$this->name'");
  80. return (int)$result->total;
  81. }
  82. }