ó
Ì	g]c           @   sÀ  d  Z  d d l Z d d l Z d d l Z d d l Z d d l Z d d l Z d d l Z d d l Z d d l	 Z	 d d l
 Z
 d d l m Z m Z d d l Z d d l Z e j e j f Z e j e j f Z e j e ƒ Z e j j e ƒ Z d „  Z d „  Z d „  Z d e f d „  ƒ  YZ  d	 e f d
 „  ƒ  YZ! d e f d „  ƒ  YZ" d e f d „  ƒ  YZ# d e f d „  ƒ  YZ$ d e f d „  ƒ  YZ% d e% f d „  ƒ  YZ& d e% f d „  ƒ  YZ' d S(   s^   Utilities for implementing `nbio_interface.AbstractIOServices` for
pika connection adapters.

iÿÿÿÿN(   t   AbstractIOReferencet   AbstractStreamTransportc         C   s+   t  |  ƒ s' t d j | |  ƒ ƒ ‚ n  d S(   s£   Raise TypeError if callback is not callable

    :param callback: callback to check
    :param name: Name to include in exception text
    :raises TypeError:

    s!   {} must be callable, but got {!r}N(   t   callablet	   TypeErrort   format(   t   callbackt   name(    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   check_callback_arg,   s    	c         C   s.   t  |  t j ƒ s* t d j |  ƒ ƒ ‚ n  d S(   sq   Raise TypeError if file descriptor is not an integer

    :param fd: file descriptor
    :raises TypeError:

    s0   Paramter must be a file descriptor, but got {!r}N(   t
   isinstancet   numberst   IntegralR   R   (   t   fd(    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   check_fd_arg9   s    c            s"   t  j ˆ  ƒ ‡  f d †  ƒ } | S(   s0   Function decorator for retrying on SIGINT.

    c             sY   xR t  rT y ˆ  |  | Ž  SWq t j j k
 rP } | j t j k rJ q qQ ‚  q Xq Wd S(   s   Wrapper for decorated functionN(   t   Truet   pikat   compatt   SOCKET_ERRORt   errnot   EINTR(   t   argst   kwargst   error(   t   func(    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   retry_sigint_wrapJ   s    	(   t	   functoolst   wraps(   R   R   (    (   R   sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   _retry_on_sigintE   s    t   SocketConnectionMixinc           B   s   e  Z d  Z d „  Z RS(   sú   Implements
    `pika.adapters.utils.nbio_interface.AbstractIOServices.connect_socket()`
    on top of
    `pika.adapters.utils.nbio_interface.AbstractFileDescriptorServices` and
    basic `pika.adapters.utils.nbio_interface.AbstractIOServices`.

    c      	   C   s%   t  d |  d | d | d | ƒ j ƒ  S(   s[   Implement
        :py:meth:`.nbio_interface.AbstractIOServices.connect_socket()`.

        t   nbiot   sockt   resolved_addrt   on_done(   t   _AsyncSocketConnectort   start(   t   selfR   R   R   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   connect_socketb   s    (   t   __name__t
   __module__t   __doc__R#   (    (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR   Y   s   t   StreamingConnectionMixinc           B   s   e  Z d  Z d d d „ Z RS(   sÒ   Implements
    `.nbio_interface.AbstractIOServices.create_streaming_connection()` on
    top of `.nbio_interface.AbstractFileDescriptorServices` and basic
    `nbio_interface.AbstractIOServices` services.

    c         C   sœ   y5 t  d |  d | d | d | d | d | ƒ j ƒ  SWn` t k
 r— } t j d | | ƒ y | j ƒ  Wn& t k
 r } t j d | | ƒ n X‚  n Xd	 S(
   sh   Implement
        :py:meth:`.nbio_interface.AbstractIOServices.create_streaming_connection()`.

        R   t   protocol_factoryR   t   ssl_contextt   server_hostnameR   s*   create_streaming_connection(%s) failed: %rs   %s.close() failed: %rN(   t   _AsyncStreamConnectorR!   t	   Exceptiont   _LOGGERR   t   close(   R"   R(   R   R   R)   R*   R   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   create_streaming_connectiont   s     
N(   R$   R%   R&   t   NoneR/   (    (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR'   l   s   t   _AsyncServiceAsyncHandlec           B   s    e  Z d  Z d „  Z d „  Z RS(   sG   This module's adaptation of `.nbio_interface.AbstractIOReference`

    c         C   s   | j  |  _ d S(   sZ   
        :param subject: subject of the reference containing a `cancel()` method

        N(   t   cancelt   _cancel(   R"   t   subject(    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   __init__š   s    c         C   s
   |  j  ƒ  S(   s   Cancel pending operation

        :returns: False if was already done or cancelled; True otherwise
        :rtype: bool

        (   R3   (   R"   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR2   ¡   s    (   R$   R%   R&   R5   R2   (    (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR1   •   s   	R    c           B   s}   e  Z d  Z d Z d Z d Z d Z d „  Z e d „  ƒ Z	 d „  Z
 d „  Z e d	 „  ƒ Z e d
 „  ƒ Z e d „  ƒ Z RS(   sú   Connects the given non-blocking socket asynchronously using
    `.nbio_interface.AbstractFileDescriptorServices` and basic
    `.nbio_interface.AbstractIOServices`. Used for implementing
    `.nbio_interface.AbstractIOServices.connect_socket()`.
    i    i   i   i   c         C   sÈ   t  | d ƒ y t j | j | d ƒ Wn` t k
 rŠ } t t d ƒ sY t j d ƒ q‹ d j | | | ƒ } t j	 | ƒ t
 | ƒ ‚ n X| |  _ | |  _ | |  _ | |  _ |  j |  _ t |  _ d S(   s  
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:
        :param socket.socket sock: non-blocking socket that needs to be
            connected via `socket.socket.connect()`
        :param tuple resolved_addr: resolved destination address/port two-tuple
            which is compatible with the given's socket's address family
        :param callable on_done: user callback that takes None upon successful
            completion or exception upon error (check for `BaseException`) as
            its only arg. It will not be called if the operation was cancelled.
        :raises ValueError: if host portion of `resolved_addr` is not an IP
            address or is inconsistent with the socket's address family as
            validated via `socket.inet_pton()`
        R   i    t	   inet_ptons8   Unable to check resolved address: no socket.inet_pton().s9   Invalid or unresolved IP address {!r} for socket {}: {!r}N(   R   t   socketR6   t   familyR,   t   hasattrR-   t   debugR   R   t
   ValueErrort   _nbiot   _sockt   _addrt   _on_donet   _STATE_NOT_STARTEDt   _statet   Falset   _watching_socket_events(   R"   R   R   R   R   R   t   msg(    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR5   ·   s"    
				c         C   s2   |  j  r. t |  _  |  j j |  j j ƒ  ƒ n  d S(   s'   Remove socket watcher, if any

        N(   RC   RB   R<   t   remove_writerR=   t   fileno(   R"   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   _cleanupÛ   s    		c         C   sP   |  j  |  j k s' t d |  j  f ƒ ‚ |  j |  _  |  j j |  j ƒ t |  ƒ S(   sZ   Start asynchronous connection establishment.

        :rtype: AbstractIOReference
        s:   _AsyncSocketConnector.start(): expected _STATE_NOT_STARTED(   RA   R@   t   AssertionErrort   _STATE_ACTIVER<   t   add_callback_threadsafet   _start_asyncR1   (   R"   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR!   ä   s    c         C   sb   |  j  |  j k rE |  j |  _  t j d |  j |  j ƒ |  j ƒ  t St j d |  j  |  j ƒ t	 S(   s¾   Cancel pending connection request without calling user's completion
        callback.

        :returns: False if was already done or cancelled; True otherwise
        :rtype: bool

        s-   User canceled connection request for %s to %ssD   _AsyncSocketConnector cancel requested when not ACTIVE: state=%s; %s(
   RA   RI   t   _STATE_CANCELEDR-   R:   R=   R>   RG   R   RB   (   R"   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR2   õ   s    	
c         C   s‘   t  j d | |  j ƒ t | t t d ƒ f ƒ sC t d | f ƒ ‚ |  j |  j	 k sj t d |  j f ƒ ‚ |  j
 |  _ |  j ƒ  |  j | ƒ d S(   s¹   Advance to COMPLETED state, remove socket watcher, and invoke user's
        completion callback.

        :param BaseException | None result: value to pass in user's callback

        s0   _AsyncSocketConnector._report_completion(%r); %ssP   _AsyncSocketConnector._report_completion() expected exception or None as result.sF   _AsyncSocketConnector._report_completion() expected _STATE_NOT_STARTEDN(   R-   R:   R=   R   t   BaseExceptiont   typeR0   RH   RA   RI   t   _STATE_COMPLETEDRG   R?   (   R"   t   result(    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   _report_completion	  s    	
c         C   s?  |  j  |  j k r5 t j d |  j |  j |  j  ƒ d Sy |  j j |  j ƒ Wns t t j	 j
 f k
 rÁ } t | t j	 j
 ƒ r‘ | j t k r‘ qÂ t j d |  j |  j | ƒ |  j | ƒ d Sn Xy# |  j j |  j j ƒ  |  j ƒ Wn7 t k
 r} t j d |  j | ƒ |  j | ƒ d SXt |  _ t j d |  j ƒ d S(   s“   Called as callback from I/O loop to kick-start the workflow, so it's
        safe to call user's completion callback from here, if needed

        sJ   Abandoning sock=%s connection establishment to %s due to inactive state=%sNs   %s.connect(%s) failed: %rs   async.set_writer(%s) failed: %rs/   Connection-establishment is in progress for %s.(   RA   RI   R-   R:   R=   R>   t   connectR,   R   R   R   R   R   t(   _CONNECTION_IN_PROGRESS_SOCK_ERROR_CODESR   RQ   R<   t
   set_writerRF   t   _on_writablet	   exceptionR   RC   (   R"   R   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyRK      s2    #		c         C   sº   |  j  |  j k r/ t j d |  j |  j  ƒ d S|  j j t j t j ƒ } | sl t j	 d |  j ƒ d } n= t j | ƒ } t j d |  j | | ƒ t j j | | ƒ } |  j | ƒ d S(   sw   Called when socket connects or fails to. Check for predicament and
        invoke user's completion callback.

        s_   Socket connection-establishment event watcher called in inactive state (ignoring): %s; state=%sNs   Socket connected: %ss+   Socket failed to connect: %s; error=%s (%s)(   RA   RI   R-   R   R=   t
   getsockoptR7   t
   SOL_SOCKETt   SO_ERRORt   infoR0   t   ost   strerrorR   R   R   RQ   (   R"   t
   error_codeRP   t	   error_msg(    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyRU   G  s    
		(   R$   R%   R&   R@   RI   RL   RO   R5   t   _log_exceptionsRG   R!   R2   RQ   RK   RU   (    (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR    «   s   	$			'R+   c           B   sŒ   e  Z d  Z d Z d Z d Z d Z d „  Z e d „  ƒ Z	 d „  Z
 d „  Z e d	 „  ƒ Z e d
 „  ƒ Z e d „  ƒ Z e d „  ƒ Z RS(   sù   Performs asynchronous SSL session establishment, if requested, on the
    already-connected socket and links the streaming transport to protocol.
    Used for implementing
    `.nbio_interface.AbstractIOServices.create_streaming_connection()`.

    i    i   i   i   c         C   s  t  | d ƒ t  | d ƒ t | t d ƒ t j f ƒ sP t d j | ƒ ƒ ‚ n  | d k	 rw | d k rw t d ƒ ‚ n  y | j ƒ  Wn+ t	 k
 r² } t d j | | ƒ ƒ ‚ n X| |  _
 | |  _ | |  _ | |  _ | |  _ | |  _ |  j |  _ t |  _ d S(   s  
        NOTE: We take ownership of the given socket upon successful completion
        of the constructor.

        See `AbstractIOServices.create_streaming_connection()` for detailed
        documentation of the corresponding args.

        :param AbstractIOServices | AbstractFileDescriptorServices nbio:
        :param callable protocol_factory:
        :param socket.socket sock:
        :param ssl.SSLContext | None ssl_context:
        :param str | None server_hostname:
        :param callable on_done:

        R(   R   s8   Expected ssl_context=None | ssl.SSLContext, but got {!r}s?   Non-None server_hostname must not be passed without ssl contextsE   Expected connected socket, but getpeername() failed: error={!r}; {}; N(   R   R   RN   R0   t   sslt
   SSLContextR;   R   t   getpeernameR,   R<   t   _protocol_factoryR=   t   _ssl_contextt   _server_hostnameR?   R@   RA   RB   t   _watching_socket(   R"   R   R(   R   R)   R*   R   R   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR5   p  s*    							c         C   s  t  j d | ƒ |  j rm t  j d | |  j ƒ t |  _ |  j j |  j j ƒ  ƒ |  j j |  j j ƒ  ƒ n  zc | rÏ t  j d | |  j ƒ y |  j j	 ƒ  WqÏ t
 k
 rË } t  j d | |  j ƒ ‚  qÏ Xn  Wd d |  _ d |  _ d |  _ d |  _ d |  _ d |  _ Xd S(   se   Cancel pending async operations, if any

        :param bool close: close the socket if true
        s"   _AsyncStreamConnector._cleanup(%r)s5   _AsyncStreamConnector._cleanup(%r): removing RdWr; %ss6   _AsyncStreamConnector._cleanup(%r): closing socket; %ss"   _sock.close() failed: error=%r; %sN(   R-   R:   Rf   R=   RB   R<   t   remove_readerRF   RE   R.   R,   RV   R0   Rc   Rd   Re   R?   (   R"   R.   R   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyRG      s2    	
							c         C   sc   t  j d |  j ƒ |  j |  j k s: t d |  j f ƒ ‚ |  j |  _ |  j j |  j	 ƒ t
 |  ƒ S(   sC   Kick off the workflow

        :rtype: AbstractIOReference
        s!   _AsyncStreamConnector.start(); %ss9   _AsyncStreamConnector.start() expected _STATE_NOT_STARTED(   R-   R:   R=   RA   R@   RH   RI   R<   RJ   RK   R1   (   R"   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR!   Ã  s    c         C   sb   |  j  |  j k rE |  j |  _  t j d |  j ƒ |  j d t ƒ t St j d |  j  |  j ƒ t S(   s¾   Cancel pending connection request without calling user's completion
        callback.

        :returns: False if was already done or cancelled; True otherwise
        :rtype: bool

        s%   User canceled streaming linkup for %sR.   sD   _AsyncStreamConnector cancel requested when not ACTIVE: state=%s; %s(	   RA   RI   RL   R-   R:   R=   RG   R   RB   (   R"   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR2   Ö  s    c         C   sÙ   t  j d | |  j ƒ t | t t f ƒ sC t d | |  j f ƒ ‚ |  j |  j k sj t d |  j f ƒ ‚ |  j	 |  _ zB y |  j
 | ƒ Wn* t k
 r¶ t  j d |  j | ƒ ‚  n XWd |  j d t | t ƒ ƒ Xd S(   s  Advance to COMPLETED state, cancel async operation(s), and invoke
        user's completion callback.

        :param BaseException | tuple result: value to pass in user's callback.
            `tuple(transport, protocol)` on success, exception on error

        s0   _AsyncStreamConnector._report_completion(%r); %ssQ   _AsyncStreamConnector._report_completion() expected exception or tuple as result.sA   _AsyncStreamConnector._report_completion() expected _STATE_ACTIVEs   %r: _on_done(%r) failed.NR.   (   R-   R:   R=   R   RM   t   tupleRH   RA   RI   RO   R?   R,   RV   RQ   RG   (   R"   RP   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyRQ   ê  s"    		 	c      
   C   sð   t  j d |  j ƒ |  j |  j k rB t  j d |  j |  j ƒ d S|  j d k r^ |  j ƒ  nŽ t  j d |  j ƒ y7 |  j j |  j d t	 d t	 d t	 d |  j
 ƒ|  _ Wn7 t k
 rá } t  j d	 |  j | ƒ |  j | ƒ d SX|  j ƒ  d S(
   s’   Called as callback from I/O loop to kick-start the workflow, so it's
        safe to call user's completion callback from here if needed

        s(   _AsyncStreamConnector._start_async(); %ssM   Abandoning streaming linkup due to inactive state transition; state=%s; %s; .Ns   Starting SSL handshake on %st   server_sidet   do_handshake_on_connectt   suppress_ragged_eofsR*   s   SSL wrap_socket(%s) failed: %r(   R-   R:   R=   RA   RI   Rd   R0   t   _linkupt   wrap_socketRB   Re   R,   RV   RQ   t   _do_ssl_handshake(   R"   R   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyRK   
  s,    		c         C   s  t  j d ƒ d } yQy |  j ƒ  } Wn, t k
 rT } t  j d | |  j ƒ ‚  n X|  j d k r² y t |  j | |  j	 ƒ } Wqý t k
 r® } t  j d | |  j ƒ ‚  qý XnK y t
 |  j | |  j	 ƒ } Wn, t k
 rü } t  j d | |  j ƒ ‚  n Xt  j d | ƒ y | j | ƒ Wn/ t k
 rO} t  j d | | |  j ƒ ‚  n Xt  j d | | ƒ Wn t k
 r} | } n X| | f } |  j | ƒ d S(	   s}   Connection is ready: instantiate and link up transport and protocol,
        and invoke user's completion callback.

        s   _AsyncStreamConnector._linkup()s'   protocol_factory() failed: error=%r; %ss%   PlainTransport() failed: error=%r; %ss#   SSLTransport() failed: error=%r; %ss   _linkup(): created transport %rs1   protocol.connection_made(%r) failed: error=%r; %ss2   _linkup(): introduced transport to protocol %r; %rN(   R-   R:   R0   Rc   R,   RV   R=   Rd   t   _AsyncPlaintextTransportR<   t   _AsyncSSLTransportt   connection_madeRQ   (   R"   t	   transportt   protocolR   RP   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyRl   0  sL    		
		
c         C   s  t  j d ƒ |  j |  j k r< t  j d |  j |  j ƒ d St } yy |  j j ƒ  Wnë t j k
 rC} | j	 t j
 k rÔ t  j d |  j ƒ t |  _ |  j j |  j j ƒ  |  j ƒ |  j j |  j j ƒ  ƒ q]| j	 t j k r=t  j d |  j ƒ t |  _ |  j j |  j j ƒ  |  j ƒ |  j j |  j j ƒ  ƒ q]‚  n Xt } t  j d |  j ƒ Wn7 t k
 r—} t  j d | |  j ƒ |  j | ƒ d SX| rt  j d |  j ƒ |  j j |  j j ƒ  ƒ |  j j |  j j ƒ  ƒ t |  _ t  j d	 |  j ƒ |  j ƒ  n  d S(
   sJ   Perform asynchronous SSL handshake on the already wrapped socket

        s)   _AsyncStreamConnector._do_ssl_handshake()s`   _do_ssl_handshake: Abandoning streaming linkup due to inactive state transition; state=%s; %s; .Ns   SSL handshake wants read; %s.s   SSL handshake wants write. %ss(   SSL handshake completed successfully: %ss%   SSL do_handshake failed: error=%r; %ss8   _do_ssl_handshake: removing watchers ahead of linkup: %ss=   _do_ssl_handshake: pre-linkup removal of watchers is done; %s(   R-   R:   RA   RI   R=   RB   t   do_handshakeR`   t   SSLErrorR   t   SSL_ERROR_WANT_READR   Rf   R<   t
   set_readerRF   Rn   RE   t   SSL_ERROR_WANT_WRITERT   Rg   RZ   R,   RV   RQ   Rl   (   R"   t   doneR   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyRn   j  sX    
	
	
	

	
(   R$   R%   R&   R@   RI   RL   RO   R5   R_   RG   R!   R2   RQ   RK   Rl   Rn   (    (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR+   d  s   	0#		 &:t   _AsyncTransportBasec           B   sí   e  Z d  Z d Z d Z d Z d Z d Z d Z d e	 f d	 „  ƒ  YZ
 d
 „  Z d „  Z d „  Z d „  Z d „  Z d „  Z d „  Z e e d „  ƒ ƒ Z e e d „  ƒ ƒ Z e d „  ƒ Z e d „  ƒ Z e d „  ƒ Z e d „  ƒ Z RS(   sI   Base class for `_AsyncPlaintextTransport` and `_AsyncSSLTransport`.

    i   i   i   i   i   i   id   t   RxEndOfFilec           B   s   e  Z d  Z d „  Z RS(   sN   We raise this internally when EOF (empty read) is detected on input.

        c         C   s    t  t j |  ƒ j d d ƒ d  S(   Niÿÿÿÿs   End of input stream (EOF)(   t   superRz   R{   R5   (   R"   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR5   ½  s    (   R$   R%   R&   R5   (    (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR{   ¸  s   c         C   sS   t  j d | ƒ | |  _ | |  _ | |  _ |  j |  _ t j ƒ  |  _	 d |  _
 d S(   s~  

        :param socket.socket | ssl.SSLSocket sock: connected socket
        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
            corresponding protocol in this transport/protocol pairing; the
            protocol already had its `connection_made()` method called.
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:

        s    _AsyncTransportBase.__init__: %si    N(   R-   R:   R=   t	   _protocolR<   RI   RA   t   collectionst   dequet   _tx_bufferst   _tx_buffered_byte_count(   R"   R   Rs   R   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR5   Á  s    
			c         C   s*   t  j d |  j |  j ƒ |  j d ƒ d S(   s  Close connection abruptly without waiting for pending I/O to
        complete. Will invoke the corresponding protocol's `connection_lost()`
        method asynchronously (not in context of the abort() call).

        :raises Exception: Exception-based exception on error
        s+   Aborting transport connection: state=%s; %sN(   R-   RZ   RA   R=   t   _initiate_abortR0   (   R"   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   abortÔ  s    
c         C   s   |  j  S(   s   Return the protocol linked to this transport.

        :rtype: pika.adapters.utils.nbio_interface.AbstractStreamProtocol
        (   R}   (   R"   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   get_protocolà  s    c         C   s   |  j  S(   se   
        :returns: Current size of output data buffered by the transport
        :rtype: int
        (   R   (   R"   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   get_write_buffer_sizeç  s    c         C   s   | s7 t  j d |  j |  j ƒ t d j | ƒ ƒ ‚ n  |  j |  j k rf t  j d |  j |  j ƒ d S|  j j	 | ƒ |  j
 t | ƒ 7_
 d S(   s–   Buffer the given data until it can be sent asynchronously.

        :param bytes data:
        :raises ValueError: if called with empty data

        s,   write() called with empty data: state=%s; %ss#   write() called with empty data {!r}s;   Ignoring write() called during inactive state: state=%s; %sN(   R-   R   RA   R=   R;   R   RI   R:   R€   t   appendR   t   len(   R"   t   data(    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   _buffer_tx_dataî  s    	c         C   sÅ   d } x¸ |  j  |  j k rÀ | |  j k  rÀ |  j |  j |  j ƒ } | t | ƒ 7} | sz t j d |  j ƒ |  j	 ƒ  ‚ n  y |  j
 j | ƒ Wq	 t k
 r¼ } t j d | |  j ƒ ‚  q	 Xq	 Wd S(   sï  Utility method for use by subclasses to ingest data from socket and
        dispatch it to protocol's `data_received()` method socket-specific
        "try again" exception, per-event data consumption limit is reached,
        transport becomes inactive, or a fatal failure.

        Consumes up to `self._MAX_CONSUME_BYTES` to prevent event starvation or
        until state becomes inactive (e.g., `protocol.data_received()` callback
        aborts the transport)

        :raises: Whatever the corresponding `sock.recv()` raises except the
                 socket error with errno.EINTR
        :raises: Whatever the `protocol.data_received()` callback raises
        :raises _AsyncTransportBase.RxEndOfFile: upon shutdown of input stream

        i    s   Socket EOF; %ss-   protocol.data_received() failed: error=%r; %sN(   RA   RI   t   _MAX_CONSUME_BYTESt   _sigint_safe_recvR=   t   _MAX_RECV_BYTESR‡   R-   R   R{   R}   t   data_receivedR,   RV   (   R"   t   bytes_consumedRˆ   R   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   _consume  s    
c         C   sº   x³ |  j  rµ |  j |  j |  j  d ƒ } |  j  j ƒ  } | t | ƒ k  ry t j d | t | ƒ ƒ |  j  j | | ƒ n  |  j | 8_ |  j d k s t	 d |  j |  j
 f ƒ ‚ q Wd S(   s  Utility method for use by subclasses to emit data from tx_buffers.
        This method sends chunks from `tx_buffers` until all chunks are
        exhausted or sending is interrupted by an exception. Maintains integrity
        of `self.tx_buffers`.

        :raises: whatever the corresponding `sock.send()` raises except the
                 socket error with errno.EINTR

        i    s/   Partial send, requeing remaining data; %s of %ss7   _AsyncTransportBase._produce() tx buffer size underflowN(   R€   t   _sigint_safe_sendR=   t   popleftR‡   R-   R:   t
   appendleftR   RH   RA   (   R"   t   num_bytes_sentt   chunk(    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   _produce(  s    
	c         C   s   |  j  | ƒ S(   sm  Receive data from socket, retrying on SIGINT.

        :param sock: stream or SSL socket
        :param max_bytes: maximum number of bytes to receive
        :returns: received data or empty bytes uppon end of file
        :rtype: bytes
        :raises: whatever the corresponding `sock.recv()` raises except socket
                 error with errno.EINTR

        (   t   recv(   R   t	   max_bytes(    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR‹   A  s    c         C   s   |  j  | ƒ S(   s@  Send data to socket, retrying on SIGINT.

        :param sock: stream or SSL socket
        :param data: data bytes to send
        :returns: number of bytes actually sent
        :rtype: int
        :raises: whatever the corresponding `sock.send()` raises except socket
                 error with errno.EINTR

        (   t   send(   R   Rˆ   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR   P  s    c         C   sq   |  j  |  j k rm t j d |  j  |  j ƒ |  j j |  j j ƒ  ƒ |  j j |  j j ƒ  ƒ |  j	 j
 ƒ  n  d S(   s2   Unregister the transport from I/O events

        s$   Deactivating transport: state=%s; %sN(   RA   RI   R-   RZ   R=   R<   Rg   RF   RE   R€   t   clear(   R"   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   _deactivate_  s    
c         C   s—   |  j  |  j k r“ t j d |  j  |  j ƒ y |  j j t j ƒ Wn t j	 j
 k
 r[ n X|  j j ƒ  d |  _ d |  _ d |  _ |  j |  _  n  d S(   s{   Close the transport's socket and unlink the transport it from
        references to other assets (protocol, etc.)

        s4   Closing transport socket and unlinking: state=%s; %sN(   RA   RO   R-   RZ   R=   t   shutdownR7   t	   SHUT_RDWRR   R   R   R.   R0   R}   R<   (   R"   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   _close_and_finalizek  s    				c         C   s  t  j d |  j | |  j ƒ |  j |  j k sC t d |  j f ƒ ‚ |  j |  j k rY d S|  j ƒ  | d k r¡ |  j |  j k r’ t  j	 d ƒ d S|  j |  _ nI |  j |  j
 k rÞ |  j |  j k sÚ t d |  j f ƒ ‚ d S|  j |  _ |  j j t j |  j | ƒ ƒ d S(   s£  Initiate asynchronous abort of the transport that concludes with a
        call to the protocol's `connection_lost()` method. No flushing of
        output buffers will take place.

        :param BaseException | None error: None if being canceled by user,
            including via falsie return value from protocol.eof_received;
            otherwise the exception corresponding to the the failed connection.
        so   _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=%s; error=%r; %ssB   _AsyncTransportBase._initate_abort() expected non-_STATE_COMPLETEDNsM   _AsyncTransportBase._initiate_abort(): ignoring - user-abort already pending.sD   _AsyncTransportBase._initate_abort() expected _STATE_ABORTED_BY_USER(   R-   RZ   RA   R=   RO   RH   Rš   R0   t   _STATE_ABORTED_BY_USERR:   RI   t   _STATE_FAILEDR<   RJ   R   t   partialt   _connection_lost_notify_async(   R"   R   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR‚   ~  s,    

	c         C   sÑ   t  j d |  j | ƒ |  j |  j k r, d S| d k	 ru |  j |  j k ru |  j |  j k sq t d |  j f ƒ ‚ d SzJ y |  j j	 | ƒ Wn/ t
 k
 r½ } t  j d | | |  j ƒ ‚  n XWd |  j ƒ  Xd S(   sž  Handle aborting of transport either due to socket error or user-
        initiated `abort()` call. Must be called from an I/O loop callback owned
        by us in order to avoid reentry into user code from user's API call into
        the transport.

        :param BaseException | None error: None if being canceled by user;
            otherwise the exception corresponding to the the failed connection.
        s1   Concluding transport shutdown: state=%s; error=%rNsS   _AsyncTransportBase._connection_lost_notify_async() expected _STATE_ABORTED_BY_USERs/   protocol.connection_lost(%r) failed: exc=%r; %s(   R-   R:   RA   RO   R0   RŸ   Rž   RH   R}   t   connection_lostR,   RV   R=   R   (   R"   R   t   exc(    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR¡   ³  s"    
	 	i  (   R$   R%   R&   RI   RŸ   Rž   RO   RŒ   RŠ   t   OSErrorR{   R5   Rƒ   R„   R…   R‰   R   R•   t   staticmethodR   R‹   R   R_   Rš   R   R‚   R¡   (    (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyRz   ¨  s.   							%	5Ro   c           B   s>   e  Z d  Z d „  Z d „  Z e d „  ƒ Z e d „  ƒ Z RS(   s`   Implementation of `nbio_interface.AbstractStreamTransport` for a
    plaintext connection.

    c         C   s?   t  t |  ƒ j | | | ƒ |  j j |  j j ƒ  |  j ƒ d S(   s{  

        :param socket.socket sock: non-blocking connected socket
        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
            corresponding protocol in this transport/protocol pairing; the
            protocol already had its `connection_made()` method called.
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:

        N(   R|   Ro   R5   R<   Rw   R=   RF   t   _on_socket_readable(   R"   R   Rs   R   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR5   Ý  s    
c         C   sŸ   |  j  |  j k r/ t j d |  j  |  j ƒ d S| sM t d | |  j  f ƒ ‚ |  j ƒ  sŽ |  j j |  j j	 ƒ  |  j
 ƒ t j d |  j ƒ n  |  j | ƒ d S(   s–   Buffer the given data until it can be sent asynchronously.

        :param bytes data:
        :raises ValueError: if called with empty data

        s;   Ignoring write() called during inactive state: state=%s; %sNs7   _AsyncPlaintextTransport.write(): empty data from user.s!   Turned on writability watcher: %s(   RA   RI   R-   R:   R=   RH   R…   R<   RT   RF   t   _on_socket_writableR‰   (   R"   Rˆ   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   writeí  s    c         C   s½  |  j  |  j k r/ t j d |  j  |  j ƒ d Sy |  j ƒ  WnK|  j k
 rô y |  j j ƒ  } Wn6 t	 k
 r› } t j
 d | |  j ƒ |  j | ƒ q¹X| rÑ t j d |  j ƒ |  j j |  j j ƒ  ƒ q¹t j d |  j ƒ |  j d ƒ nÅ t	 t j j f k
 rŠ} t | t j j ƒ rI| j t k rIt j d |  j ƒ q¹t j
 d | |  j d j t j t j ƒ  Œ  ƒ ƒ |  j | ƒ n/ X|  j  |  j k r¹t j d	 |  j  |  j ƒ n  d S(
   sÜ   Ingest data from socket and dispatch it to protocol until exception
        occurs (typically EAGAIN or EWOULDBLOCK), per-event data consumption
        limit is reached, transport becomes inactive, or failure.

        sE   Ignoring readability notification due to inactive state: state=%s; %sNs,   protocol.eof_received() failed: error=%r; %ss0   protocol.eof_received() elected to keep open: %ss,   protocol.eof_received() elected to close: %ss   Recv would block on %ssa   _AsyncBaseTransport._consume() failed, aborting connection: error=%r; sock=%s; Caller's stack:
%st    s>   Leaving Plaintext consumer due to inactive state: state=%s; %s(   RA   RI   R-   R:   R=   R   R{   R}   t   eof_receivedR,   RV   R‚   RZ   R<   Rg   RF   R0   R   R   R   R   R   t   _TRY_IO_AGAIN_SOCK_ERROR_CODESt   joint	   tracebackt   format_exceptiont   syst   exc_info(   R"   t	   keep_openR   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR¦     sH    

	
c         C   s1  |  j  |  j k r/ t j d |  j  |  j ƒ d S|  j sM t d |  j  f ƒ ‚ y |  j ƒ  Wn— t t	 j
 j f k
 rô } t | t	 j
 j ƒ r³ | j t k r³ t j d |  j ƒ q-t j d | |  j d j t j t j ƒ  Œ  ƒ ƒ |  j | ƒ n9 X|  j s-|  j j |  j j ƒ  ƒ t j d |  j ƒ n  d S(   s-   Handle writable socket notification

        sE   Ignoring writability notification due to inactive state: state=%s; %sNsP   _AsyncPlaintextTransport._on_socket_writable() called, but _tx_buffers is empty.s   Send would block on %ssa   _AsyncBaseTransport._produce() failed, aborting connection: error=%r; sock=%s; Caller's stack:
%sR©   s"   Turned off writability watcher: %s(   RA   RI   R-   R:   R=   R€   RH   R•   R,   R   R   R   R   R   R«   RV   R¬   R­   R®   R¯   R°   R‚   R<   RE   RF   (   R"   R   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR§   7  s,    	(   R$   R%   R&   R5   R¨   R_   R¦   R§   (    (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyRo   ×  s
   		4Rp   c           B   s\   e  Z d  Z d „  Z d „  Z e d „  ƒ Z e d „  ƒ Z e d „  ƒ Z e d „  ƒ Z	 RS(   s\   Implementation of `.nbio_interface.AbstractStreamTransport` for an SSL
    connection.

    c         C   sg   t  t |  ƒ j | | | ƒ |  j |  _ d |  _ |  j j |  j	 j
 ƒ  |  j ƒ |  j j |  j ƒ d S(   s{  

        :param ssl.SSLSocket sock: non-blocking connected socket
        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
            corresponding protocol in this transport/protocol pairing; the
            protocol already had its `connection_made()` method called.
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:

        N(   R|   Rp   R5   R   t   _ssl_readable_actionR0   t   _ssl_writable_actionR<   Rw   R=   RF   R¦   RJ   (   R"   R   Rs   R   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR5   a  s
    
	c         C   s¨   |  j  |  j k r/ t j d |  j  |  j ƒ d S|  j ƒ  d k } |  j | ƒ | r¤ |  j d k r¤ |  j	 |  _ |  j
 j |  j j ƒ  |  j ƒ t j d |  j ƒ n  d S(   s–   Buffer the given data until it can be sent asynchronously.

        :param bytes data:
        :raises ValueError: if called with empty data

        s;   Ignoring write() called during inactive state: state=%s; %sNi    s!   Turned on writability watcher: %s(   RA   RI   R-   R:   R=   R…   R‰   R³   R0   R•   R<   RT   RF   R§   (   R"   Rˆ   t   tx_buffer_was_empty(    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR¨   u  s    c         C   s‰   |  j  |  j k r/ t j d |  j  |  j ƒ d S|  j rl y |  j ƒ  Wq… t k
 rh } |  j | ƒ q… Xn t j d |  j |  j ƒ d S(   s+   Handle readable socket indication

        sE   Ignoring readability notification due to inactive state: state=%s; %sNs>   SSL readable action was suppressed: ssl_writable_action=%r; %s(	   RA   RI   R-   R:   R=   R²   R,   R‚   R³   (   R"   R   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR¦   ‹  s    	c         C   s‰   |  j  |  j k r/ t j d |  j  |  j ƒ d S|  j rl y |  j ƒ  Wq… t k
 rh } |  j | ƒ q… Xn t j d |  j |  j ƒ d S(   s-   Handle writable socket notification

        sE   Ignoring writability notification due to inactive state: state=%s; %sNs>   SSL writable action was suppressed: ssl_readable_action=%r; %s(	   RA   RI   R-   R:   R=   R³   R,   R‚   R²   (   R"   R   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR§   ¡  s    	c         C   s#  t  } y t t |  ƒ j ƒ  Wn  t j k
 r¿ } | j t j k rZ t j	 d |  j
 ƒ q| j t j k rˆ t j	 d |  j
 ƒ t } qt j d | |  j
 d j t j t j ƒ  Œ  ƒ ƒ ‚  nC X|  j |  j k rï t j	 d |  j |  j
 ƒ d S|  j j |  j ƒ | ry|  j s3|  j j |  j
 j ƒ  |  j ƒ n  |  j |  _ |  j |  j k rÞ|  j j |  j
 j ƒ  ƒ d |  _ qÞne |  j s¤|  j j |  j
 j ƒ  |  j ƒ n  |  j |  _ |  j rÞ|  j j  |  j
 j ƒ  ƒ d |  _ n  |  j! r|  j r|  j" |  _ |  j j |  j
 j ƒ  |  j ƒ n  d S(   s“  [override] Ingest data from socket and dispatch it to protocol until
        exception occurs (typically ssl.SSLError with
        SSL_ERROR_WANT_READ/WRITE), per-event data consumption limit is reached,
        transport becomes inactive, or failure.

        Update consumer/producer registration.

        :raises Exception: error that signals that connection needs to be
            aborted
        s   SSL ingester wants read: %ss   SSL ingester wants write: %ssa   _AsyncBaseTransport._consume() failed, aborting connection: error=%r; sock=%s; Caller's stack:
%sR©   s8   Leaving SSL consumer due to inactive state: state=%s; %sN(#   R   R|   Rp   R   R`   Ru   R   Rv   R-   R:   R=   Rx   RB   RV   R¬   R­   R®   R¯   R°   RA   RI   R<   RJ   R¦   R²   Rw   RF   R³   RE   R0   RT   R§   Rg   R€   R•   (   R"   t   next_consume_on_readableR   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR   ·  sL    				c         C   s#  d } y t t |  ƒ j ƒ  Wn¦ t j k
 rÅ } | j t j k r` t j	 d |  j
 ƒ t } që | j t j k rŽ t j	 d |  j
 ƒ t } që t j d | |  j
 d j t j t j ƒ  Œ  ƒ ƒ ‚  n& X|  j së t d t |  j ƒ f ƒ ‚ |  j rô| d k	 st d |  j f ƒ ‚ | rŒ|  j sF|  j j |  j
 j ƒ  |  j ƒ n  |  j |  _ |  j |  j k rñ|  j j |  j
 j ƒ  ƒ d |  _ qñq°|  j s·|  j j |  j
 j ƒ  |  j  ƒ n  |  j |  _ |  j r°|  j j! |  j
 j ƒ  ƒ d |  _ q°n¼ |  j |  j k rR|  j j |  j
 j ƒ  ƒ d |  _ |  j |  j k s°t d |  j f ƒ ‚ n^ |  j |  j k sŽt d d	 |  j d
 |  j d |  j f ƒ ‚ d |  _ |  j j! |  j
 j ƒ  ƒ |  j sú|  j" |  _ |  j j |  j
 j ƒ  |  j  ƒ |  j j# |  j  ƒ n% |  j
 j$ ƒ  r|  j j# |  j  ƒ n  d S(   sI  [override] Emit data from tx_buffers all chunks are exhausted or
        sending is interrupted by an exception (typically ssl.SSLError with
        SSL_ERROR_WANT_READ/WRITE).

        Update consumer/producer registration.

        :raises Exception: error that signals that connection needs to be
            aborted

        s   SSL emitter wants read: %ss   SSL emitter wants write: %ssa   _AsyncBaseTransport._produce() failed, aborting connection: error=%r; sock=%s; Caller's stack:
%sR©   s_   _AsyncSSLTransport._produce(): no exception from parent class, but data remains in _tx_buffers.sE   _AsyncSSLTransport._produce(): next_produce_on_writable is still Nonesr   _AsyncSSLTransport._produce(): with empty tx_buffers, writable_action cannot be _produce when readable is _produces   _AsyncSSLTransport._produce(): with empty tx_buffers, expected writable_action as _produce when readable_action is not _produces   writable_action:s   readable_action:s   state:N(%   R0   R|   Rp   R•   R`   Ru   R   Rv   R-   R:   R=   RB   Rx   R   RV   R¬   R­   R®   R¯   R°   R€   RH   R‡   RA   R³   R<   RT   RF   R§   R²   Rg   Rw   R¦   RE   R   RJ   t   pending(   R"   t   next_produce_on_writableR   (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyR•      sr    										(
   R$   R%   R&   R5   R¨   R_   R¦   R§   R   R•   (    (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyRp   [  s   		I((   R&   R~   R   R   t   loggingR	   R[   R7   R`   R¯   R­   t"   pika.adapters.utils.nbio_interfaceR    R   t   pika.compatR   t   pika.diagnostic_utilst   EAGAINt   EWOULDBLOCKR«   t   EINPROGRESSRS   t	   getLoggerR$   R-   t   diagnostic_utilst   create_log_exception_decoratorR_   R   R   R   t   objectR   R'   R1   R    R+   Rz   Ro   Rp   (    (    (    sJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyt   <module>   sB   			)¹ÿ Eÿ /„