Server : Apache System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64 User : palandch ( 1163) PHP Version : 7.1.33 Disable Function : NONE Directory : /home/palandch/Cloudflare-CPanel-7.0.1/vendor/guzzlehttp/guzzle/src/ |
<?php namespace GuzzleHttp; use GuzzleHttp\Event\BeforeEvent; use GuzzleHttp\Event\RequestEvents; use GuzzleHttp\Message\RequestInterface; use GuzzleHttp\Message\ResponseInterface; use GuzzleHttp\Ring\Core; use GuzzleHttp\Ring\Future\FutureInterface; use GuzzleHttp\Event\ListenerAttacherTrait; use GuzzleHttp\Event\EndEvent; use React\Promise\Deferred; use React\Promise\FulfilledPromise; use React\Promise\PromiseInterface; use React\Promise\RejectedPromise; /** * Sends and iterator of requests concurrently using a capped pool size. * * The Pool object implements FutureInterface, meaning it can be used later * when necessary, the requests provided to the pool can be cancelled, and * you can check the state of the pool to know if it has been dereferenced * (sent) or has been cancelled. * * When sending the pool, keep in mind that no results are returned: callers * are expected to handle results asynchronously using Guzzle's event system. * When requests complete, more are added to the pool to ensure that the * requested pool size is always filled as much as possible. * * IMPORTANT: Do not provide a pool size greater that what the utilized * underlying RingPHP handler can support. This will result is extremely poor * performance. */ class Pool implements FutureInterface { use ListenerAttacherTrait; /** @var \GuzzleHttp\ClientInterface */ private $client; /** @var \Iterator Yields requests */ private $iter; /** @var Deferred */ private $deferred; /** @var PromiseInterface */ private $promise; private $waitQueue = []; private $eventListeners = []; private $poolSize; private $isRealized = false; /** * The option values for 'before', 'complete', 'error' and 'end' can be a * callable, an associative array containing event data, or an array of * event data arrays. Event data arrays contain the following keys: * * - fn: callable to invoke that receives the event * - priority: Optional event priority (defaults to 0) * - once: Set to true so that the event is removed after it is triggered * * @param ClientInterface $client Client used to send the requests. * @param array|\Iterator $requests Requests to send in parallel * @param array $options Associative array of options * - pool_size: (callable|int) Maximum number of requests to send * concurrently, or a callback that receives * the current queue size and returns the * number of new requests to send * - before: (callable|array) Receives a BeforeEvent * - complete: (callable|array) Receives a CompleteEvent * - error: (callable|array) Receives a ErrorEvent * - end: (callable|array) Receives an EndEvent */ public function __construct( ClientInterface $client, $requests, array $options = [] ) { $this->client = $client; $this->iter = $this->coerceIterable($requests); $this->deferred = new Deferred(); $this->promise = $this->deferred->promise(); $this->poolSize = isset($options['pool_size']) ? $options['pool_size'] : 25; $this->eventListeners = $this->prepareListeners( $options, ['before', 'complete', 'error', 'end'] ); } /** * Sends multiple requests in parallel and returns an array of responses * and exceptions that uses the same ordering as the provided requests. * * IMPORTANT: This method keeps every request and response in memory, and * as such, is NOT recommended when sending a large number or an * indeterminate number of requests concurrently. * * @param ClientInterface $client Client used to send the requests * @param array|\Iterator $requests Requests to send in parallel * @param array $options Passes through the options available in * {@see GuzzleHttp\Pool::__construct} * * @return BatchResults Returns a container for the results. * @throws \InvalidArgumentException if the event format is incorrect. */ public static function batch( ClientInterface $client, $requests, array $options = [] ) { $hash = new \SplObjectStorage(); foreach ($requests as $request) { $hash->attach($request); } // In addition to the normally run events when requests complete, add // and event to continuously track the results of transfers in the hash. (new self($client, $requests, RequestEvents::convertEventArray( $options, ['end'], [ 'priority' => RequestEvents::LATE, 'fn' => function (EndEvent $e) use ($hash) { $hash[$e->getRequest()] = $e->getException() ? $e->getException() : $e->getResponse(); } ] )))->wait(); return new BatchResults($hash); } /** * Creates a Pool and immediately sends the requests. * * @param ClientInterface $client Client used to send the requests * @param array|\Iterator $requests Requests to send in parallel * @param array $options Passes through the options available in * {@see GuzzleHttp\Pool::__construct} */ public static function send( ClientInterface $client, $requests, array $options = [] ) { $pool = new self($client, $requests, $options); $pool->wait(); } private function getPoolSize() { return is_callable($this->poolSize) ? call_user_func($this->poolSize, count($this->waitQueue)) : $this->poolSize; } /** * Add as many requests as possible up to the current pool limit. */ private function addNextRequests() { $limit = max($this->getPoolSize() - count($this->waitQueue), 0); while ($limit--) { if (!$this->addNextRequest()) { break; } } } public function wait() { if ($this->isRealized) { return false; } // Seed the pool with N number of requests. $this->addNextRequests(); // Stop if the pool was cancelled while transferring requests. if ($this->isRealized) { return false; } // Wait on any outstanding FutureResponse objects. while ($response = array_pop($this->waitQueue)) { try { $response->wait(); } catch (\Exception $e) { // Eat exceptions because they should be handled asynchronously } $this->addNextRequests(); } // Clean up no longer needed state. $this->isRealized = true; $this->waitQueue = $this->eventListeners = []; $this->client = $this->iter = null; $this->deferred->resolve(true); return true; } /** * {@inheritdoc} * * Attempt to cancel all outstanding requests (requests that are queued for * dereferencing). Returns true if all outstanding requests can be * cancelled. * * @return bool */ public function cancel() { if ($this->isRealized) { return false; } $success = $this->isRealized = true; foreach ($this->waitQueue as $response) { if (!$response->cancel()) { $success = false; } } return $success; } /** * Returns a promise that is invoked when the pool completed. There will be * no passed value. * * {@inheritdoc} */ public function then( callable $onFulfilled = null, callable $onRejected = null, callable $onProgress = null ) { return $this->promise->then($onFulfilled, $onRejected, $onProgress); } public function promise() { return $this->promise; } private function coerceIterable($requests) { if ($requests instanceof \Iterator) { return $requests; } elseif (is_array($requests)) { return new \ArrayIterator($requests); } throw new \InvalidArgumentException('Expected Iterator or array. ' . 'Found ' . Core::describeType($requests)); } /** * Adds the next request to pool and tracks what requests need to be * dereferenced when completing the pool. */ private function addNextRequest() { add_next: if ($this->isRealized || !$this->iter || !$this->iter->valid()) { return false; } $request = $this->iter->current(); $this->iter->next(); if (!($request instanceof RequestInterface)) { throw new \InvalidArgumentException(sprintf( 'All requests in the provided iterator must implement ' . 'RequestInterface. Found %s', Core::describeType($request) )); } // Be sure to use "lazy" futures, meaning they do not send right away. $request->getConfig()->set('future', 'lazy'); $hash = spl_object_hash($request); $this->attachListeners($request, $this->eventListeners); $request->getEmitter()->on('before', [$this, '_trackRetries'], RequestEvents::EARLY); $response = $this->client->send($request); $this->waitQueue[$hash] = $response; $promise = $response->promise(); // Don't recursively call itself for completed or rejected responses. if ($promise instanceof FulfilledPromise || $promise instanceof RejectedPromise ) { try { $this->finishResponse($request, $response->wait(), $hash); } catch (\Exception $e) { $this->finishResponse($request, $e, $hash); } goto add_next; } // Use this function for both resolution and rejection. $thenFn = function ($value) use ($request, $hash) { $this->finishResponse($request, $value, $hash); if (!$request->getConfig()->get('_pool_retries')) { $this->addNextRequests(); } }; $promise->then($thenFn, $thenFn); return true; } public function _trackRetries(BeforeEvent $e) { $e->getRequest()->getConfig()->set('_pool_retries', $e->getRetryCount()); } private function finishResponse($request, $value, $hash) { unset($this->waitQueue[$hash]); $result = $value instanceof ResponseInterface ? ['request' => $request, 'response' => $value, 'error' => null] : ['request' => $request, 'response' => null, 'error' => $value]; $this->deferred->notify($result); } }