MultiCurl.php 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. <?php
  2. namespace Buzz\Client;
  3. use Buzz\Converter\RequestConverter;
  4. use Buzz\Converter\ResponseConverter;
  5. use Buzz\Exception\RequestException;
  6. use Buzz\Message\MessageInterface;
  7. use Buzz\Message\RequestInterface;
  8. use Buzz\Exception\ClientException;
  9. use Psr\Http\Message\RequestInterface as PSR7RequestInterface;
  10. class MultiCurl extends AbstractCurl implements BatchClientInterface
  11. {
  12. private $queue = array();
  13. private $curlm;
  14. /**
  15. * Populates the supplied response with the response for the supplied request.
  16. *
  17. * The array of options will be passed to curl_setopt_array().
  18. *
  19. * If a "callback" option is supplied, its value will be called when the
  20. * request completes. The callable should have the following signature:
  21. *
  22. * $callback = function($client, $request, $response, $options, $error) {
  23. * if (!$error) {
  24. * // success
  25. * } else {
  26. * // error ($error is one of the CURLE_* constants)
  27. * }
  28. * };
  29. *
  30. * @param RequestInterface $request A request object
  31. * @param MessageInterface $response A response object
  32. * @param array $options An array of options
  33. *
  34. * @deprecated Will be removed in 1.0. Use sendRequest instead.
  35. */
  36. public function send(RequestInterface $request, MessageInterface $response, array $options = array())
  37. {
  38. @trigger_error('MultiCurl::send() is deprecated. Use MultiCurl::sendRequest instead.', E_USER_DEPRECATED);
  39. $request = RequestConverter::psr7($request);
  40. $this->queue[] = array($request, $response, $options);
  41. }
  42. public function sendRequest(PSR7RequestInterface $request, $options = [])
  43. {
  44. $this->queue[] = array($request, null, $options);
  45. }
  46. public function count()
  47. {
  48. return count($this->queue);
  49. }
  50. public function flush()
  51. {
  52. while ($this->queue) {
  53. $this->proceed();
  54. }
  55. }
  56. public function proceed()
  57. {
  58. if (!$this->queue) {
  59. return;
  60. }
  61. if (!$this->curlm && false === $this->curlm = curl_multi_init()) {
  62. throw new ClientException('Unable to create a new cURL multi handle');
  63. }
  64. foreach (array_keys($this->queue) as $i) {
  65. if (3 == count($this->queue[$i])) {
  66. // prepare curl handle
  67. list($request, , $options) = $this->queue[$i];
  68. $curl = static::createCurlHandle();
  69. // remove custom option
  70. unset($options['callback']);
  71. $this->prepare($curl, $request, $options);
  72. $this->queue[$i][] = $curl;
  73. curl_multi_add_handle($this->curlm, $curl);
  74. }
  75. }
  76. // process outstanding perform
  77. $active = null;
  78. do {
  79. $mrc = curl_multi_exec($this->curlm, $active);
  80. } while ($active && CURLM_CALL_MULTI_PERFORM == $mrc);
  81. // handle any completed requests
  82. while ($done = curl_multi_info_read($this->curlm)) {
  83. foreach (array_keys($this->queue) as $i) {
  84. list($request, $response, $options, $curl) = $this->queue[$i];
  85. if ($curl !== $done['handle']) {
  86. continue;
  87. }
  88. // populate the response object
  89. if (CURLE_OK === $done['result']) {
  90. $psr7Response = $this->createResponse($curl, curl_multi_getcontent($curl));
  91. ResponseConverter::copy(ResponseConverter::buzz($psr7Response), $response);
  92. } else if (!isset($e)) {
  93. $errorMsg = curl_error($curl);
  94. $errorNo = curl_errno($curl);
  95. $e = new RequestException($errorMsg, $errorNo);
  96. $e->setRequest($request);
  97. }
  98. // remove from queue
  99. curl_multi_remove_handle($this->curlm, $curl);
  100. curl_close($curl);
  101. unset($this->queue[$i]);
  102. // callback
  103. if (isset($options['callback'])) {
  104. $returnResponse = isset($options['psr7_response']) && $options['psr7_response'] === true ? $psr7Response : $response;
  105. call_user_func($options['callback'], $this, $request, $returnResponse, $options, $done['result']);
  106. }
  107. }
  108. }
  109. // cleanup
  110. if (!$this->queue) {
  111. curl_multi_close($this->curlm);
  112. $this->curlm = null;
  113. }
  114. if (isset($e)) {
  115. throw $e;
  116. }
  117. }
  118. }