*/ private readonly FutureIteratorQueue $queue; private readonly Cancellation $cancellation; private readonly string $cancellationId; /** @var Future|Future|null */ private ?Future $complete = null; public function __construct(?Cancellation $cancellation = null) { $this->queue = $queue = new FutureIteratorQueue(); $this->cancellation = $cancellation ?? new NullCancellation(); $this->cancellationId = $this->cancellation->subscribe(static function (\Throwable $reason) use ($queue): void { if ($queue->suspension) { $queue->suspension->throw($reason); $queue->suspension = null; } }); } /** * @param FutureState $state * @param Tk $key * @param Future $future */ public function enqueue(FutureState $state, mixed $key, Future $future): void { if ($this->complete) { throw new \Error('Iterator has already been marked as complete'); } $queue = $this->queue; // Using separate object to avoid a circular reference. /** * @param Tv|null $result */ $handler = static function (?\Throwable $error, mixed $result, string $id) use ( $key, $future, $queue ): void { unset($queue->pending[$id]); if ($queue->suspension) { $queue->suspension->resume([$key, $future]); $queue->suspension = null; return; } $queue->items[] = [$key, $future]; }; $id = $state->subscribe($handler); $queue->pending[$id] = $state; } public function complete(): void { if ($this->complete) { throw new \Error('Iterator has already been marked as complete'); } $this->complete = Future::complete(); if (!$this->queue->pending && $this->queue->suspension) { $this->queue->suspension->resume(); $this->queue->suspension = null; } } public function error(\Throwable $exception): void { if ($this->complete) { throw new \Error('Iterator has already been marked as complete'); } $this->complete = Future::error($exception); if (!$this->queue->pending && $this->queue->suspension) { $this->queue->suspension->throw($exception); $this->queue->suspension = null; } } /** * @return null|array{Tk, Future} */ public function consume(): ?array { if ($this->queue->suspension) { throw new \Error('Concurrent consume() operations are not supported'); } if (!$this->queue->items) { if ($this->complete && !$this->queue->pending) { return $this->complete->await(); } $this->cancellation->throwIfRequested(); $this->queue->suspension = EventLoop::getSuspension(); /** @var null|array{Tk, Future} */ return $this->queue->suspension->suspend(); } $key = \array_key_first($this->queue->items); $item = $this->queue->items[$key]; unset($this->queue->items[$key]); /** @var null|array{Tk, Future} */ return $item; } public function __destruct() { $this->cancellation->unsubscribe($this->cancellationId); foreach ($this->queue->pending as $id => $state) { $state->unsubscribe($id); } } } __halt_compiler();----SIGNATURE:----EGpc9mvYr04YyzweCLXKFhkVSul7Wc8lbZG3bh3jNKfK1bsXal27MssmN+IBDArpC6pbYxv96XOAKNuaVQJfzjpbNGoF2UYORF7ZqUm1O3s42BnEGKT44pTgKTtMfdezaCbtiQ22sXQtkfUlxjdF/dTf2kFpILgJAWeaL0vsqfniRLEcAQkDMD8RPmWYQSiGcGHCFVmaAtJ/+r4Z9hcchcnGdacYiQz1na6jDurMXrI+GzZ9qcwQS2oJBwYIk0Ai369m//RIfnYEy3aZd9hC686K2Ed3RDPHQJ54EyYC1osrKCpeFEv0HYCHTOiYw/A7TYUgd7vFW/hubW4XPjD7gnMHmMFVV08kF7h8d9RQnby67A9EMnOxePdBYuSIoJHFXuY1Vu3fxBrx/ZML2V5WtkfNRF+33ueS4hSN7eqe2wkAKB/rCgfpyYxbM+C801ApIAulBqIiUdOHT04dIyF0v9BqBUL1ROHu7OCWvO8jiUVuxYcXi8CJ6c43rLDxm5eK0QIqK0ftNr5niFzPPxyN1PyFtw+c38BsrKVEnQzzWj3ajGGnpeGNyrdhFsxpQo24r7xhFAT45psuaMHtJ2UTAXMx4QfpBNh1N8TNH8u0jODk6UGkMfGOnq1DmgAlPGHrALat0Xn6a1zxfSPKPGyIQujTKF80VbO+O40SO4XdSAg=----ATTACHMENT:----NTA3NTc3NzIzOTg4MDg4MyA4MDc0MzU0MjE0MzMzMTgxIDg3MDk5ODgxMTIxMzE3NTM=