l
Ì	g]c               @   s¨  d  Z  d d l Z d d l Z d d l Z d d l Z d d l Z d d l Z d d l Z d d l Z d d l	 Z	 d d l
 Z
 d d l m Z m Z d d l Z d d l Z e j e j f Z e j e j f Z e j e ƒ Z e j j e ƒ Z d „  Z d „  Z d „  Z Gd „  d e ƒ Z  Gd	 „  d
 e ƒ Z! Gd „  d e ƒ Z" Gd „  d e ƒ Z# Gd „  d e ƒ Z$ Gd „  d e ƒ Z% Gd „  d e% ƒ Z& Gd „  d e% ƒ Z' d S(   u^   Utilities for implementing `nbio_interface.AbstractIOServices` for
pika connection adapters.

i    N(   u   AbstractIOReferenceu   AbstractStreamTransportc             C   s+   t  |  ƒ s' t d j | |  ƒ ƒ ‚ n  d S(   u£   Raise TypeError if callback is not callable

    :param callback: callback to check
    :param name: Name to include in exception text
    :raises TypeError:

    u!   {} must be callable, but got {!r}N(   u   callableu	   TypeErroru   format(   u   callbacku   name(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   check_callback_arg,   s    	c             C   s.   t  |  t j ƒ s* t d j |  ƒ ƒ ‚ n  d S(   uq   Raise TypeError if file descriptor is not an integer

    :param fd: file descriptor
    :raises TypeError:

    u0   Paramter must be a file descriptor, but got {!r}N(   u
   isinstanceu   numbersu   Integralu	   TypeErroru   format(   u   fd(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   check_fd_arg9   s    c                s"   t  j ˆ  ƒ ‡  f d †  ƒ } | S(   u0   Function decorator for retrying on SIGINT.

    c                 sd   x] y ˆ  |  | Ž  SWq t  j j k
 r\ } z  | j t j k rG w n ‚  WYd d } ~ Xq Xq d S(   u   Wrapper for decorated functionN(   u   pikau   compatu   SOCKET_ERRORu   errnou   EINTR(   u   argsu   kwargsu   error(   u   func(    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   retry_sigint_wrapJ   s    (   u	   functoolsu   wraps(   u   funcu   retry_sigint_wrap(    (   u   funcuJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _retry_on_sigintE   s    c             B   s   |  Ee  Z d  Z d „  Z d S(   uú   Implements
    `pika.adapters.utils.nbio_interface.AbstractIOServices.connect_socket()`
    on top of
    `pika.adapters.utils.nbio_interface.AbstractFileDescriptorServices` and
    basic `pika.adapters.utils.nbio_interface.AbstractIOServices`.

    c          	   C   s%   t  d |  d | d | d | ƒ j ƒ  S(   u[   Implement
        :py:meth:`.nbio_interface.AbstractIOServices.connect_socket()`.

        u   nbiou   socku   resolved_addru   on_done(   u   _AsyncSocketConnectoru   start(   u   selfu   socku   resolved_addru   on_done(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   connect_socketb   s    N(   u   __name__u
   __module__u   __doc__u   connect_socket(   u
   __locals__(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   SocketConnectionMixinY   s   
u   SocketConnectionMixinc             B   s#   |  Ee  Z d  Z d d d „ Z d S(   uÒ   Implements
    `.nbio_interface.AbstractIOServices.create_streaming_connection()` on
    top of `.nbio_interface.AbstractFileDescriptorServices` and basic
    `nbio_interface.AbstractIOServices` services.

    c             C   sÀ   y5 t  d |  d | d | d | d | d | ƒ j ƒ  SWn„ t k
 r» } zd t j d | | ƒ y | j ƒ  Wn8 t k
 r¥ } z t j d | | ƒ WYd	 d	 } ~ Xn X‚  WYd	 d	 } ~ Xn Xd	 S(
   uh   Implement
        :py:meth:`.nbio_interface.AbstractIOServices.create_streaming_connection()`.

        u   nbiou   protocol_factoryu   socku   ssl_contextu   server_hostnameu   on_doneu*   create_streaming_connection(%s) failed: %ru   %s.close() failed: %rN(   u   _AsyncStreamConnectoru   startu	   Exceptionu   _LOGGERu   erroru   close(   u   selfu   protocol_factoryu   socku   on_doneu   ssl_contextu   server_hostnameu   error(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   create_streaming_connectiont   s     
&N(   u   __name__u
   __module__u   __doc__u   Noneu   create_streaming_connection(   u
   __locals__(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   StreamingConnectionMixinl   s   
u   StreamingConnectionMixinc             B   s&   |  Ee  Z d  Z d „  Z d „  Z d S(   uG   This module's adaptation of `.nbio_interface.AbstractIOReference`

    c             C   s   | j  |  _ d S(   uZ   
        :param subject: subject of the reference containing a `cancel()` method

        N(   u   cancelu   _cancel(   u   selfu   subject(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   __init__š   s    c             C   s
   |  j  ƒ  S(   u   Cancel pending operation

        :returns: False if was already done or cancelled; True otherwise
        :rtype: bool

        (   u   _cancel(   u   self(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   cancel¡   s    N(   u   __name__u
   __module__u   __doc__u   __init__u   cancel(   u
   __locals__(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _AsyncServiceAsyncHandle•   s   
	u   _AsyncServiceAsyncHandlec             B   sƒ   |  Ee  Z d  Z d Z d Z d Z d Z d „  Z e d „  ƒ Z	 d „  Z
 d „  Z e d	 „  ƒ Z e d
 „  ƒ Z e d „  ƒ Z d S(   uú   Connects the given non-blocking socket asynchronously using
    `.nbio_interface.AbstractFileDescriptorServices` and basic
    `.nbio_interface.AbstractIOServices`. Used for implementing
    `.nbio_interface.AbstractIOServices.connect_socket()`.
    i    i   i   i   c             C   sÚ   t  | d ƒ y t j | j | d ƒ Wnr t k
 rœ } zR t t d ƒ s\ t j d ƒ n. d j | | | ƒ } t j	 | ƒ t
 | ƒ ‚ WYd d } ~ Xn X| |  _ | |  _ | |  _ | |  _ |  j |  _ d |  _ d S(   u  
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:
        :param socket.socket sock: non-blocking socket that needs to be
            connected via `socket.socket.connect()`
        :param tuple resolved_addr: resolved destination address/port two-tuple
            which is compatible with the given's socket's address family
        :param callable on_done: user callback that takes None upon successful
            completion or exception upon error (check for `BaseException`) as
            its only arg. It will not be called if the operation was cancelled.
        :raises ValueError: if host portion of `resolved_addr` is not an IP
            address or is inconsistent with the socket's address family as
            validated via `socket.inet_pton()`
        u   on_donei    u	   inet_ptonu8   Unable to check resolved address: no socket.inet_pton().u9   Invalid or unresolved IP address {!r} for socket {}: {!r}NF(   u   check_callback_argu   socketu	   inet_ptonu   familyu	   Exceptionu   hasattru   _LOGGERu   debugu   formatu   erroru
   ValueErroru   _nbiou   _socku   _addru   _on_doneu   _STATE_NOT_STARTEDu   _stateu   Falseu   _watching_socket_events(   u   selfu   nbiou   socku   resolved_addru   on_doneu   erroru   msg(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   __init__·   s"    
				c             C   s2   |  j  r. d |  _  |  j j |  j j ƒ  ƒ n  d S(   u'   Remove socket watcher, if any

        NF(   u   _watching_socket_eventsu   Falseu   _nbiou   remove_writeru   _socku   fileno(   u   self(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _cleanupÛ   s    		c             C   sP   |  j  |  j k s' t d |  j  f ƒ ‚ |  j |  _  |  j j |  j ƒ t |  ƒ S(   uZ   Start asynchronous connection establishment.

        :rtype: AbstractIOReference
        u:   _AsyncSocketConnector.start(): expected _STATE_NOT_STARTED(   u   _stateu   _STATE_NOT_STARTEDu   AssertionErroru   _STATE_ACTIVEu   _nbiou   add_callback_threadsafeu   _start_asyncu   _AsyncServiceAsyncHandle(   u   self(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   startä   s    c             C   sb   |  j  |  j k rE |  j |  _  t j d |  j |  j ƒ |  j ƒ  d St j d |  j  |  j ƒ d S(   u¾   Cancel pending connection request without calling user's completion
        callback.

        :returns: False if was already done or cancelled; True otherwise
        :rtype: bool

        u-   User canceled connection request for %s to %suD   _AsyncSocketConnector cancel requested when not ACTIVE: state=%s; %sTF(
   u   _stateu   _STATE_ACTIVEu   _STATE_CANCELEDu   _LOGGERu   debugu   _socku   _addru   _cleanupu   Trueu   False(   u   self(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   cancelõ   s    	
c             C   s‘   t  j d | |  j ƒ t | t t d ƒ f ƒ sC t d | f ƒ ‚ |  j |  j	 k sj t d |  j f ƒ ‚ |  j
 |  _ |  j ƒ  |  j | ƒ d S(   u¹   Advance to COMPLETED state, remove socket watcher, and invoke user's
        completion callback.

        :param BaseException | None result: value to pass in user's callback

        u0   _AsyncSocketConnector._report_completion(%r); %suP   _AsyncSocketConnector._report_completion() expected exception or None as result.uF   _AsyncSocketConnector._report_completion() expected _STATE_NOT_STARTEDN(   u   _LOGGERu   debugu   _socku
   isinstanceu   BaseExceptionu   typeu   Noneu   AssertionErroru   _stateu   _STATE_ACTIVEu   _STATE_COMPLETEDu   _cleanupu   _on_done(   u   selfu   result(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _report_completion	  s    	
c             C   sf  |  j  |  j k r5 t j d |  j |  j |  j  ƒ d Sy |  j j |  j ƒ Wn… t t j	 j
 f k
 rÓ } zY t | t j	 j
 ƒ r” | j t k r” n- t j d |  j |  j | ƒ |  j | ƒ d SWYd d } ~ Xn Xy# |  j j |  j j ƒ  |  j ƒ WnL t k
 rE} z, t j d |  j | ƒ |  j | ƒ d SWYd d } ~ Xn Xd |  _ t j d |  j ƒ d S(   u“   Called as callback from I/O loop to kick-start the workflow, so it's
        safe to call user's completion callback from here, if needed

        uJ   Abandoning sock=%s connection establishment to %s due to inactive state=%sNu   %s.connect(%s) failed: %ru   async.set_writer(%s) failed: %ru/   Connection-establishment is in progress for %s.T(   u   _stateu   _STATE_ACTIVEu   _LOGGERu   debugu   _socku   _addru   connectu	   Exceptionu   pikau   compatu   SOCKET_ERRORu
   isinstanceu   errnou(   _CONNECTION_IN_PROGRESS_SOCK_ERROR_CODESu   erroru   _report_completionu   _nbiou
   set_writeru   filenou   _on_writableu	   exceptionu   Trueu   _watching_socket_events(   u   selfu   error(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _start_async   s2    #		c             C   sº   |  j  |  j k r/ t j d |  j |  j  ƒ d S|  j j t j t j ƒ } | sl t j	 d |  j ƒ d } n= t j | ƒ } t j d |  j | | ƒ t j j | | ƒ } |  j | ƒ d S(   uw   Called when socket connects or fails to. Check for predicament and
        invoke user's completion callback.

        u_   Socket connection-establishment event watcher called in inactive state (ignoring): %s; state=%sNu   Socket connected: %su+   Socket failed to connect: %s; error=%s (%s)(   u   _stateu   _STATE_ACTIVEu   _LOGGERu   erroru   _socku
   getsockoptu   socketu
   SOL_SOCKETu   SO_ERRORu   infou   Noneu   osu   strerroru   pikau   compatu   SOCKET_ERRORu   _report_completion(   u   selfu
   error_codeu   resultu	   error_msg(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _on_writableG  s    
		N(   u   __name__u
   __module__u   __doc__u   _STATE_NOT_STARTEDu   _STATE_ACTIVEu   _STATE_CANCELEDu   _STATE_COMPLETEDu   __init__u   _log_exceptionsu   _cleanupu   startu   cancelu   _report_completionu   _start_asyncu   _on_writable(   u
   __locals__(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _AsyncSocketConnector«   s   
	$			'u   _AsyncSocketConnectorc             B   s’   |  Ee  Z d  Z d Z d Z d Z d Z d „  Z e d „  ƒ Z	 d „  Z
 d „  Z e d	 „  ƒ Z e d
 „  ƒ Z e d „  ƒ Z e d „  ƒ Z d S(   uù   Performs asynchronous SSL session establishment, if requested, on the
    already-connected socket and links the streaming transport to protocol.
    Used for implementing
    `.nbio_interface.AbstractIOServices.create_streaming_connection()`.

    i    i   i   i   c             C   s  t  | d ƒ t  | d ƒ t | t d ƒ t j f ƒ sP t d j | ƒ ƒ ‚ n  | d k	 rw | d k rw t d ƒ ‚ n  y | j ƒ  Wn= t	 k
 rÄ } z t d j | | ƒ ƒ ‚ WYd d } ~ Xn X| |  _
 | |  _ | |  _ | |  _ | |  _ | |  _ |  j |  _ d |  _ d S(   u  
        NOTE: We take ownership of the given socket upon successful completion
        of the constructor.

        See `AbstractIOServices.create_streaming_connection()` for detailed
        documentation of the corresponding args.

        :param AbstractIOServices | AbstractFileDescriptorServices nbio:
        :param callable protocol_factory:
        :param socket.socket sock:
        :param ssl.SSLContext | None ssl_context:
        :param str | None server_hostname:
        :param callable on_done:

        u   protocol_factoryu   on_doneu8   Expected ssl_context=None | ssl.SSLContext, but got {!r}u?   Non-None server_hostname must not be passed without ssl contextuE   Expected connected socket, but getpeername() failed: error={!r}; {}; NF(   u   check_callback_argu
   isinstanceu   typeu   Noneu   sslu
   SSLContextu
   ValueErroru   formatu   getpeernameu	   Exceptionu   _nbiou   _protocol_factoryu   _socku   _ssl_contextu   _server_hostnameu   _on_doneu   _STATE_NOT_STARTEDu   _stateu   Falseu   _watching_socket(   u   selfu   nbiou   protocol_factoryu   socku   ssl_contextu   server_hostnameu   on_doneu   error(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   __init__p  s*    	"						c             C   s   t  j d | ƒ |  j rm t  j d | |  j ƒ d |  _ |  j j |  j j ƒ  ƒ |  j j |  j j ƒ  ƒ n  zu | rá t  j d | |  j ƒ y |  j j	 ƒ  Wqá t
 k
 rÝ } z t  j d | |  j ƒ ‚  WYd d } ~ Xqá Xn  Wd d |  _ d |  _ d |  _ d |  _ d |  _ d |  _ Xd S(   ue   Cancel pending async operations, if any

        :param bool close: close the socket if true
        u"   _AsyncStreamConnector._cleanup(%r)u5   _AsyncStreamConnector._cleanup(%r): removing RdWr; %su6   _AsyncStreamConnector._cleanup(%r): closing socket; %su"   _sock.close() failed: error=%r; %sNF(   u   _LOGGERu   debugu   _watching_socketu   _socku   Falseu   _nbiou   remove_readeru   filenou   remove_writeru   closeu	   Exceptionu	   exceptionu   Noneu   _protocol_factoryu   _ssl_contextu   _server_hostnameu   _on_done(   u   selfu   closeu   error(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _cleanup   s2    	
							c             C   sc   t  j d |  j ƒ |  j |  j k s: t d |  j f ƒ ‚ |  j |  _ |  j j |  j	 ƒ t
 |  ƒ S(   uC   Kick off the workflow

        :rtype: AbstractIOReference
        u!   _AsyncStreamConnector.start(); %su9   _AsyncStreamConnector.start() expected _STATE_NOT_STARTED(   u   _LOGGERu   debugu   _socku   _stateu   _STATE_NOT_STARTEDu   AssertionErroru   _STATE_ACTIVEu   _nbiou   add_callback_threadsafeu   _start_asyncu   _AsyncServiceAsyncHandle(   u   self(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   startÃ  s    c             C   sb   |  j  |  j k rE |  j |  _  t j d |  j ƒ |  j d d ƒ d St j d |  j  |  j ƒ d S(   u¾   Cancel pending connection request without calling user's completion
        callback.

        :returns: False if was already done or cancelled; True otherwise
        :rtype: bool

        u%   User canceled streaming linkup for %su   closeuD   _AsyncStreamConnector cancel requested when not ACTIVE: state=%s; %sTF(	   u   _stateu   _STATE_ACTIVEu   _STATE_CANCELEDu   _LOGGERu   debugu   _socku   _cleanupu   Trueu   False(   u   self(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   cancelÖ  s    c             C   sÚ   t  j d | |  j ƒ t | t t f ƒ sC t d | |  j f ƒ ‚ |  j |  j k sj t d |  j f ƒ ‚ |  j	 |  _ zC y |  j
 | ƒ Wn+ t k
 r· t  j d |  j | ƒ ‚  Yn XWd |  j d t | t ƒ ƒ Xd S(   u  Advance to COMPLETED state, cancel async operation(s), and invoke
        user's completion callback.

        :param BaseException | tuple result: value to pass in user's callback.
            `tuple(transport, protocol)` on success, exception on error

        u0   _AsyncStreamConnector._report_completion(%r); %suQ   _AsyncStreamConnector._report_completion() expected exception or tuple as result.uA   _AsyncStreamConnector._report_completion() expected _STATE_ACTIVEu   %r: _on_done(%r) failed.Nu   close(   u   _LOGGERu   debugu   _socku
   isinstanceu   BaseExceptionu   tupleu   AssertionErroru   _stateu   _STATE_ACTIVEu   _STATE_COMPLETEDu   _on_doneu	   Exceptionu	   exceptionu   _report_completionu   _cleanup(   u   selfu   result(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _report_completionê  s"    		 	c             C   s  t  j d |  j ƒ |  j |  j k rB t  j d |  j |  j ƒ d S|  j d k r^ |  j ƒ  n£ t  j d |  j ƒ y7 |  j j |  j d d
 d d
 d d
 d |  j
 ƒ|  _ WnL t k
 rö } z, t  j d	 |  j | ƒ |  j | ƒ d SWYd d } ~ Xn X|  j ƒ  d S(   u’   Called as callback from I/O loop to kick-start the workflow, so it's
        safe to call user's completion callback from here if needed

        u(   _AsyncStreamConnector._start_async(); %suM   Abandoning streaming linkup due to inactive state transition; state=%s; %s; .Nu   Starting SSL handshake on %su   server_sideu   do_handshake_on_connectu   suppress_ragged_eofsu   server_hostnameu   SSL wrap_socket(%s) failed: %rF(   u   _LOGGERu   debugu   _socku   _stateu   _STATE_ACTIVEu   _ssl_contextu   Noneu   _linkupu   wrap_socketu   Falseu   _server_hostnameu	   Exceptionu	   exceptionu   _report_completionu   _do_ssl_handshake(   u   selfu   error(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _start_async
  s,    		c          4   C   s÷  t  j d ƒ d } y™y |  j ƒ  } Wn> t k
 rf } z t  j d | |  j ƒ ‚  WYd d } ~ Xn X|  j d k rÖ y t |  j | |  j	 ƒ } Wq3t k
 rÒ } z t  j d | |  j ƒ ‚  WYd d } ~ Xq3Xn] y t
 |  j | |  j	 ƒ } Wn> t k
 r2} z t  j d | |  j ƒ ‚  WYd d } ~ Xn Xt  j d | ƒ y | j | ƒ WnA t k
 r—} z! t  j d | | |  j ƒ ‚  WYd d } ~ Xn Xt  j d | | ƒ Wn+ t k
 rÙ} z | } WYd d } ~ Xn X| | f } |  j | ƒ d S(	   u}   Connection is ready: instantiate and link up transport and protocol,
        and invoke user's completion callback.

        u   _AsyncStreamConnector._linkup()u'   protocol_factory() failed: error=%r; %sNu%   PlainTransport() failed: error=%r; %su#   SSLTransport() failed: error=%r; %su   _linkup(): created transport %ru1   protocol.connection_made(%r) failed: error=%r; %su2   _linkup(): introduced transport to protocol %r; %r(   u   _LOGGERu   debugu   Noneu   _protocol_factoryu	   Exceptionu	   exceptionu   _socku   _ssl_contextu   _AsyncPlaintextTransportu   _nbiou   _AsyncSSLTransportu   connection_madeu   _report_completion(   u   selfu	   transportu   protocolu   erroru   result(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _linkup0  sL    				c             C   s7  t  j d ƒ |  j |  j k r< t  j d |  j |  j ƒ d Sd
 } y.y |  j j ƒ  Wný t j k
 rU} zÚ | j	 t j
 k r× t  j d |  j ƒ d |  _ |  j j |  j j ƒ  |  j ƒ |  j j |  j j ƒ  ƒ nl | j	 t j k r@t  j d |  j ƒ d |  _ |  j j |  j j ƒ  |  j ƒ |  j j |  j j ƒ  ƒ n ‚  WYd d } ~ Xn Xd } t  j d |  j ƒ WnL t k
 r¾} z, t  j d | |  j ƒ |  j | ƒ d SWYd d } ~ Xn X| r3t  j d |  j ƒ |  j j |  j j ƒ  ƒ |  j j |  j j ƒ  ƒ d
 |  _ t  j d	 |  j ƒ |  j ƒ  n  d S(   uJ   Perform asynchronous SSL handshake on the already wrapped socket

        u)   _AsyncStreamConnector._do_ssl_handshake()u`   _do_ssl_handshake: Abandoning streaming linkup due to inactive state transition; state=%s; %s; .Nu   SSL handshake wants read; %s.u   SSL handshake wants write. %su(   SSL handshake completed successfully: %su%   SSL do_handshake failed: error=%r; %su8   _do_ssl_handshake: removing watchers ahead of linkup: %su=   _do_ssl_handshake: pre-linkup removal of watchers is done; %sFT(   u   _LOGGERu   debugu   _stateu   _STATE_ACTIVEu   _socku   Falseu   do_handshakeu   sslu   SSLErroru   errnou   SSL_ERROR_WANT_READu   Trueu   _watching_socketu   _nbiou
   set_readeru   filenou   _do_ssl_handshakeu   remove_writeru   SSL_ERROR_WANT_WRITEu
   set_writeru   remove_readeru   infou	   Exceptionu	   exceptionu   _report_completionu   _linkup(   u   selfu   doneu   error(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _do_ssl_handshakej  sX    
	
	
	

	
N(   u   __name__u
   __module__u   __doc__u   _STATE_NOT_STARTEDu   _STATE_ACTIVEu   _STATE_CANCELEDu   _STATE_COMPLETEDu   __init__u   _log_exceptionsu   _cleanupu   startu   cancelu   _report_completionu   _start_asyncu   _linkupu   _do_ssl_handshake(   u
   __locals__(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _AsyncStreamConnectord  s   
	0#		 &:u   _AsyncStreamConnectorc             B   sð   |  Ee  Z d  Z d Z d Z d Z d Z d Z d Z Gd „  d	 e	 ƒ Z
 d
 „  Z d „  Z d „  Z d „  Z d „  Z d „  Z d „  Z e e d „  ƒ ƒ Z e e d „  ƒ ƒ Z e d „  ƒ Z e d „  ƒ Z e d „  ƒ Z e d „  ƒ Z d S(   uI   Base class for `_AsyncPlaintextTransport` and `_AsyncSSLTransport`.

    i   i   i   i   i   i   id   c                s#   |  Ee  Z d  Z ‡  f d †  Z ‡  S(   uN   We raise this internally when EOF (empty read) is detected on input.

        c                s    t  t j |  ƒ j d d ƒ d  S(   Ni   u   End of input stream (EOF)iÿÿÿÿ(   u   superu   _AsyncTransportBaseu   RxEndOfFileu   __init__(   u   self(   u	   __class__(    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   __init__½  s    (   u   __name__u
   __module__u   __doc__u   __init__(   u
   __locals__(    (   u	   __class__uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   RxEndOfFile¸  s   
u   RxEndOfFilec             C   sS   t  j d | ƒ | |  _ | |  _ | |  _ |  j |  _ t j ƒ  |  _	 d |  _
 d S(   u~  

        :param socket.socket | ssl.SSLSocket sock: connected socket
        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
            corresponding protocol in this transport/protocol pairing; the
            protocol already had its `connection_made()` method called.
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:

        u    _AsyncTransportBase.__init__: %si    N(   u   _LOGGERu   debugu   _socku	   _protocolu   _nbiou   _STATE_ACTIVEu   _stateu   collectionsu   dequeu   _tx_buffersu   _tx_buffered_byte_count(   u   selfu   socku   protocolu   nbio(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   __init__Á  s    
			c             C   s*   t  j d |  j |  j ƒ |  j d ƒ d S(   u  Close connection abruptly without waiting for pending I/O to
        complete. Will invoke the corresponding protocol's `connection_lost()`
        method asynchronously (not in context of the abort() call).

        :raises Exception: Exception-based exception on error
        u+   Aborting transport connection: state=%s; %sN(   u   _LOGGERu   infou   _stateu   _socku   _initiate_abortu   None(   u   self(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   abortÔ  s    
c             C   s   |  j  S(   u   Return the protocol linked to this transport.

        :rtype: pika.adapters.utils.nbio_interface.AbstractStreamProtocol
        (   u	   _protocol(   u   self(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   get_protocolà  s    c             C   s   |  j  S(   ue   
        :returns: Current size of output data buffered by the transport
        :rtype: int
        (   u   _tx_buffered_byte_count(   u   self(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   get_write_buffer_sizeç  s    c             C   s   | s7 t  j d |  j |  j ƒ t d j | ƒ ƒ ‚ n  |  j |  j k rf t  j d |  j |  j ƒ d S|  j j	 | ƒ |  j
 t | ƒ 7_
 d S(   u–   Buffer the given data until it can be sent asynchronously.

        :param bytes data:
        :raises ValueError: if called with empty data

        u,   write() called with empty data: state=%s; %su#   write() called with empty data {!r}u;   Ignoring write() called during inactive state: state=%s; %sN(   u   _LOGGERu   erroru   _stateu   _socku
   ValueErroru   formatu   _STATE_ACTIVEu   debugu   _tx_buffersu   appendu   _tx_buffered_byte_countu   len(   u   selfu   data(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _buffer_tx_dataî  s    	c             C   s×   d } xÊ |  j  |  j k rÒ | |  j k  rÒ |  j |  j |  j ƒ } | t | ƒ 7} | sz t j d |  j ƒ |  j	 ƒ  ‚ n  y |  j
 j | ƒ Wq	 t k
 rÎ } z t j d | |  j ƒ ‚  WYd d } ~ Xq	 Xq	 Wd S(   uï  Utility method for use by subclasses to ingest data from socket and
        dispatch it to protocol's `data_received()` method socket-specific
        "try again" exception, per-event data consumption limit is reached,
        transport becomes inactive, or a fatal failure.

        Consumes up to `self._MAX_CONSUME_BYTES` to prevent event starvation or
        until state becomes inactive (e.g., `protocol.data_received()` callback
        aborts the transport)

        :raises: Whatever the corresponding `sock.recv()` raises except the
                 socket error with errno.EINTR
        :raises: Whatever the `protocol.data_received()` callback raises
        :raises _AsyncTransportBase.RxEndOfFile: upon shutdown of input stream

        i    u   Socket EOF; %su-   protocol.data_received() failed: error=%r; %sN(   u   _stateu   _STATE_ACTIVEu   _MAX_CONSUME_BYTESu   _sigint_safe_recvu   _socku   _MAX_RECV_BYTESu   lenu   _LOGGERu   erroru   RxEndOfFileu	   _protocolu   data_receivedu	   Exceptionu	   exception(   u   selfu   bytes_consumedu   datau   error(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _consume  s    
c             C   sÀ   x¹ |  j  r» |  j |  j |  j  d ƒ } |  j  j ƒ  } | t | ƒ k  r t j d | t | ƒ ƒ |  j  j | | d … ƒ n  |  j | 8_ |  j d k s t	 d |  j |  j
 f ƒ ‚ q Wd S(   u  Utility method for use by subclasses to emit data from tx_buffers.
        This method sends chunks from `tx_buffers` until all chunks are
        exhausted or sending is interrupted by an exception. Maintains integrity
        of `self.tx_buffers`.

        :raises: whatever the corresponding `sock.send()` raises except the
                 socket error with errno.EINTR

        i    u/   Partial send, requeing remaining data; %s of %sNu7   _AsyncTransportBase._produce() tx buffer size underflow(   u   _tx_buffersu   _sigint_safe_sendu   _socku   popleftu   lenu   _LOGGERu   debugu
   appendleftu   _tx_buffered_byte_countu   AssertionErroru   _state(   u   selfu   num_bytes_sentu   chunk(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _produce(  s    
	c             C   s   |  j  | ƒ S(   um  Receive data from socket, retrying on SIGINT.

        :param sock: stream or SSL socket
        :param max_bytes: maximum number of bytes to receive
        :returns: received data or empty bytes uppon end of file
        :rtype: bytes
        :raises: whatever the corresponding `sock.recv()` raises except socket
                 error with errno.EINTR

        (   u   recv(   u   socku	   max_bytes(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _sigint_safe_recvA  s    c             C   s   |  j  | ƒ S(   u@  Send data to socket, retrying on SIGINT.

        :param sock: stream or SSL socket
        :param data: data bytes to send
        :returns: number of bytes actually sent
        :rtype: int
        :raises: whatever the corresponding `sock.send()` raises except socket
                 error with errno.EINTR

        (   u   send(   u   socku   data(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _sigint_safe_sendP  s    c             C   sq   |  j  |  j k rm t j d |  j  |  j ƒ |  j j |  j j ƒ  ƒ |  j j |  j j ƒ  ƒ |  j	 j
 ƒ  n  d S(   u2   Unregister the transport from I/O events

        u$   Deactivating transport: state=%s; %sN(   u   _stateu   _STATE_ACTIVEu   _LOGGERu   infou   _socku   _nbiou   remove_readeru   filenou   remove_writeru   _tx_buffersu   clear(   u   self(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _deactivate_  s    
c             C   s˜   |  j  |  j k r” t j d |  j  |  j ƒ y |  j j t j ƒ Wn t j	 j
 k
 r\ Yn X|  j j ƒ  d |  _ d |  _ d |  _ |  j |  _  n  d S(   u{   Close the transport's socket and unlink the transport it from
        references to other assets (protocol, etc.)

        u4   Closing transport socket and unlinking: state=%s; %sN(   u   _stateu   _STATE_COMPLETEDu   _LOGGERu   infou   _socku   shutdownu   socketu	   SHUT_RDWRu   pikau   compatu   SOCKET_ERRORu   closeu   Noneu	   _protocolu   _nbio(   u   self(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _close_and_finalizek  s    				c             C   s  t  j d |  j | |  j ƒ |  j |  j k sC t d |  j f ƒ ‚ |  j |  j k rY d S|  j ƒ  | d k r¡ |  j |  j k r’ t  j	 d ƒ d S|  j |  _ nI |  j |  j
 k rÞ |  j |  j k sÚ t d |  j f ƒ ‚ d S|  j |  _ |  j j t j |  j | ƒ ƒ d S(   u£  Initiate asynchronous abort of the transport that concludes with a
        call to the protocol's `connection_lost()` method. No flushing of
        output buffers will take place.

        :param BaseException | None error: None if being canceled by user,
            including via falsie return value from protocol.eof_received;
            otherwise the exception corresponding to the the failed connection.
        uo   _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=%s; error=%r; %suB   _AsyncTransportBase._initate_abort() expected non-_STATE_COMPLETEDNuM   _AsyncTransportBase._initiate_abort(): ignoring - user-abort already pending.uD   _AsyncTransportBase._initate_abort() expected _STATE_ABORTED_BY_USER(   u   _LOGGERu   infou   _stateu   _socku   _STATE_COMPLETEDu   AssertionErroru   _deactivateu   Noneu   _STATE_ABORTED_BY_USERu   debugu   _STATE_ACTIVEu   _STATE_FAILEDu   _nbiou   add_callback_threadsafeu	   functoolsu   partialu   _connection_lost_notify_async(   u   selfu   error(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _initiate_abort~  s,    

	c             C   sã   t  j d |  j | ƒ |  j |  j k r, d S| d k	 ru |  j |  j k ru |  j |  j k sq t d |  j f ƒ ‚ d Sz\ y |  j j	 | ƒ WnA t
 k
 rÏ } z! t  j d | | |  j ƒ ‚  WYd d } ~ Xn XWd |  j ƒ  Xd S(   už  Handle aborting of transport either due to socket error or user-
        initiated `abort()` call. Must be called from an I/O loop callback owned
        by us in order to avoid reentry into user code from user's API call into
        the transport.

        :param BaseException | None error: None if being canceled by user;
            otherwise the exception corresponding to the the failed connection.
        u1   Concluding transport shutdown: state=%s; error=%rNuS   _AsyncTransportBase._connection_lost_notify_async() expected _STATE_ABORTED_BY_USERu/   protocol.connection_lost(%r) failed: exc=%r; %s(   u   _LOGGERu   debugu   _stateu   _STATE_COMPLETEDu   Noneu   _STATE_FAILEDu   _STATE_ABORTED_BY_USERu   AssertionErroru	   _protocolu   connection_lostu	   Exceptionu	   exceptionu   _socku   _close_and_finalize(   u   selfu   erroru   exc(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _connection_lost_notify_async³  s"    
	 	Ni  (   u   __name__u
   __module__u   __doc__u   _STATE_ACTIVEu   _STATE_FAILEDu   _STATE_ABORTED_BY_USERu   _STATE_COMPLETEDu   _MAX_RECV_BYTESu   _MAX_CONSUME_BYTESu   OSErroru   RxEndOfFileu   __init__u   abortu   get_protocolu   get_write_buffer_sizeu   _buffer_tx_datau   _consumeu   _produceu   staticmethodu   _retry_on_sigintu   _sigint_safe_recvu   _sigint_safe_sendu   _log_exceptionsu   _deactivateu   _close_and_finalizeu   _initiate_abortu   _connection_lost_notify_async(   u
   __locals__(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _AsyncTransportBase¨  s.   
							%	5u   _AsyncTransportBasec                sJ   |  Ee  Z d  Z ‡  f d †  Z d „  Z e d „  ƒ Z e d „  ƒ Z ‡  S(   u`   Implementation of `nbio_interface.AbstractStreamTransport` for a
    plaintext connection.

    c                s?   t  t |  ƒ j | | | ƒ |  j j |  j j ƒ  |  j ƒ d S(   u{  

        :param socket.socket sock: non-blocking connected socket
        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
            corresponding protocol in this transport/protocol pairing; the
            protocol already had its `connection_made()` method called.
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:

        N(   u   superu   _AsyncPlaintextTransportu   __init__u   _nbiou
   set_readeru   _socku   filenou   _on_socket_readable(   u   selfu   socku   protocolu   nbio(   u	   __class__(    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   __init__Ý  s    
c             C   sŸ   |  j  |  j k r/ t j d |  j  |  j ƒ d S| sM t d | |  j  f ƒ ‚ |  j ƒ  sŽ |  j j |  j j	 ƒ  |  j
 ƒ t j d |  j ƒ n  |  j | ƒ d S(   u–   Buffer the given data until it can be sent asynchronously.

        :param bytes data:
        :raises ValueError: if called with empty data

        u;   Ignoring write() called during inactive state: state=%s; %sNu7   _AsyncPlaintextTransport.write(): empty data from user.u!   Turned on writability watcher: %s(   u   _stateu   _STATE_ACTIVEu   _LOGGERu   debugu   _socku   AssertionErroru   get_write_buffer_sizeu   _nbiou
   set_writeru   filenou   _on_socket_writableu   _buffer_tx_data(   u   selfu   data(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   writeí  s    c             C   sâ  |  j  |  j k r/ t j d |  j  |  j ƒ d Sy |  j ƒ  Wnp|  j k
 ry |  j j ƒ  } WnH t	 k
 r­ } z( t j
 d | |  j ƒ |  j | ƒ WYd d } ~ XnV X| rã t j d |  j ƒ |  j j |  j j ƒ  ƒ n  t j d |  j ƒ |  j d ƒ Yn× t	 t j j f k
 r¯} z} t | t j j ƒ r_| j t k r_t j d |  j ƒ n> t j
 d | |  j d j t j t j ƒ  Œ  ƒ ƒ |  j | ƒ WYd d } ~ Xn/ X|  j  |  j k rÞt j d	 |  j  |  j ƒ n  d S(
   uÜ   Ingest data from socket and dispatch it to protocol until exception
        occurs (typically EAGAIN or EWOULDBLOCK), per-event data consumption
        limit is reached, transport becomes inactive, or failure.

        uE   Ignoring readability notification due to inactive state: state=%s; %sNu,   protocol.eof_received() failed: error=%r; %su0   protocol.eof_received() elected to keep open: %su,   protocol.eof_received() elected to close: %su   Recv would block on %sua   _AsyncBaseTransport._consume() failed, aborting connection: error=%r; sock=%s; Caller's stack:
%su    u>   Leaving Plaintext consumer due to inactive state: state=%s; %s(   u   _stateu   _STATE_ACTIVEu   _LOGGERu   debugu   _socku   _consumeu   RxEndOfFileu	   _protocolu   eof_receivedu	   Exceptionu	   exceptionu   _initiate_abortu   infou   _nbiou   remove_readeru   filenou   Noneu   pikau   compatu   SOCKET_ERRORu
   isinstanceu   errnou   _TRY_IO_AGAIN_SOCK_ERROR_CODESu   joinu	   tracebacku   format_exceptionu   sysu   exc_info(   u   selfu	   keep_openu   error(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _on_socket_readable  sH    
 
	
 c             C   sC  |  j  |  j k r/ t j d |  j  |  j ƒ d S|  j sM t d |  j  f ƒ ‚ y |  j ƒ  Wn© t t	 j
 j f k
 r} z} t | t	 j
 j ƒ r¶ | j t k r¶ t j d |  j ƒ n> t j d | |  j d j t j t j ƒ  Œ  ƒ ƒ |  j | ƒ WYd d } ~ Xn9 X|  j s?|  j j |  j j ƒ  ƒ t j d |  j ƒ n  d S(   u-   Handle writable socket notification

        uE   Ignoring writability notification due to inactive state: state=%s; %sNuP   _AsyncPlaintextTransport._on_socket_writable() called, but _tx_buffers is empty.u   Send would block on %sua   _AsyncBaseTransport._produce() failed, aborting connection: error=%r; sock=%s; Caller's stack:
%su    u"   Turned off writability watcher: %s(   u   _stateu   _STATE_ACTIVEu   _LOGGERu   debugu   _socku   _tx_buffersu   AssertionErroru   _produceu	   Exceptionu   pikau   compatu   SOCKET_ERRORu
   isinstanceu   errnou   _TRY_IO_AGAIN_SOCK_ERROR_CODESu	   exceptionu   joinu	   tracebacku   format_exceptionu   sysu   exc_infou   _initiate_abortu   _nbiou   remove_writeru   fileno(   u   selfu   error(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _on_socket_writable7  s,     	(   u   __name__u
   __module__u   __doc__u   __init__u   writeu   _log_exceptionsu   _on_socket_readableu   _on_socket_writable(   u
   __locals__(    (   u	   __class__uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _AsyncPlaintextTransport×  s
   
	4u   _AsyncPlaintextTransportc                st   |  Ee  Z d  Z ‡  f d †  Z d „  Z e d „  ƒ Z e d „  ƒ Z e ‡  f d †  ƒ Z e ‡  f d †  ƒ Z	 ‡  S(   u\   Implementation of `.nbio_interface.AbstractStreamTransport` for an SSL
    connection.

    c                sg   t  t |  ƒ j | | | ƒ |  j |  _ d |  _ |  j j |  j	 j
 ƒ  |  j ƒ |  j j |  j ƒ d S(   u{  

        :param ssl.SSLSocket sock: non-blocking connected socket
        :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
            corresponding protocol in this transport/protocol pairing; the
            protocol already had its `connection_made()` method called.
        :param AbstractIOServices | AbstractFileDescriptorServices nbio:

        N(   u   superu   _AsyncSSLTransportu   __init__u   _consumeu   _ssl_readable_actionu   Noneu   _ssl_writable_actionu   _nbiou
   set_readeru   _socku   filenou   _on_socket_readableu   add_callback_threadsafe(   u   selfu   socku   protocolu   nbio(   u	   __class__(    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   __init__a  s
    
	c             C   s¨   |  j  |  j k r/ t j d |  j  |  j ƒ d S|  j ƒ  d k } |  j | ƒ | r¤ |  j d k r¤ |  j	 |  _ |  j
 j |  j j ƒ  |  j ƒ t j d |  j ƒ n  d S(   u–   Buffer the given data until it can be sent asynchronously.

        :param bytes data:
        :raises ValueError: if called with empty data

        u;   Ignoring write() called during inactive state: state=%s; %sNi    u!   Turned on writability watcher: %s(   u   _stateu   _STATE_ACTIVEu   _LOGGERu   debugu   _socku   get_write_buffer_sizeu   _buffer_tx_datau   _ssl_writable_actionu   Noneu   _produceu   _nbiou
   set_writeru   filenou   _on_socket_writable(   u   selfu   datau   tx_buffer_was_empty(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   writeu  s    c             C   s›   |  j  |  j k r/ t j d |  j  |  j ƒ d S|  j r~ y |  j ƒ  Wq— t k
 rz } z |  j | ƒ WYd d } ~ Xq— Xn t j d |  j |  j ƒ d S(   u+   Handle readable socket indication

        uE   Ignoring readability notification due to inactive state: state=%s; %sNu>   SSL readable action was suppressed: ssl_writable_action=%r; %s(	   u   _stateu   _STATE_ACTIVEu   _LOGGERu   debugu   _socku   _ssl_readable_actionu	   Exceptionu   _initiate_abortu   _ssl_writable_action(   u   selfu   error(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _on_socket_readable‹  s    	#c             C   s›   |  j  |  j k r/ t j d |  j  |  j ƒ d S|  j r~ y |  j ƒ  Wq— t k
 rz } z |  j | ƒ WYd d } ~ Xq— Xn t j d |  j |  j ƒ d S(   u-   Handle writable socket notification

        uE   Ignoring writability notification due to inactive state: state=%s; %sNu>   SSL writable action was suppressed: ssl_readable_action=%r; %s(	   u   _stateu   _STATE_ACTIVEu   _LOGGERu   debugu   _socku   _ssl_writable_actionu	   Exceptionu   _initiate_abortu   _ssl_readable_action(   u   selfu   error(    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _on_socket_writable¡  s    	#c                s5  d } y t t |  ƒ j ƒ  Wn² t j k
 rÑ } z | j t j k r] t j	 d |  j
 ƒ nb | j t j k r‹ t j	 d |  j
 ƒ d } n4 t j d | |  j
 d j t j t j ƒ  Œ  ƒ ƒ ‚  WYd d } ~ XnC X|  j |  j k rt j	 d |  j |  j
 ƒ d S|  j j |  j ƒ | r‹|  j sE|  j j |  j
 j ƒ  |  j ƒ n  |  j |  _ |  j |  j k rð|  j j |  j
 j ƒ  ƒ d |  _ qðne |  j s¶|  j j |  j
 j ƒ  |  j ƒ n  |  j |  _ |  j rð|  j j  |  j
 j ƒ  ƒ d |  _ n  |  j! r1|  j r1|  j" |  _ |  j j |  j
 j ƒ  |  j ƒ n  d S(	   u“  [override] Ingest data from socket and dispatch it to protocol until
        exception occurs (typically ssl.SSLError with
        SSL_ERROR_WANT_READ/WRITE), per-event data consumption limit is reached,
        transport becomes inactive, or failure.

        Update consumer/producer registration.

        :raises Exception: error that signals that connection needs to be
            aborted
        u   SSL ingester wants read: %su   SSL ingester wants write: %sua   _AsyncBaseTransport._consume() failed, aborting connection: error=%r; sock=%s; Caller's stack:
%su    Nu8   Leaving SSL consumer due to inactive state: state=%s; %sTF(#   u   Trueu   superu   _AsyncSSLTransportu   _consumeu   sslu   SSLErroru   errnou   SSL_ERROR_WANT_READu   _LOGGERu   debugu   _socku   SSL_ERROR_WANT_WRITEu   Falseu	   exceptionu   joinu	   tracebacku   format_exceptionu   sysu   exc_infou   _stateu   _STATE_ACTIVEu   _nbiou   add_callback_threadsafeu   _on_socket_readableu   _ssl_readable_actionu
   set_readeru   filenou   _ssl_writable_actionu   remove_writeru   Noneu
   set_writeru   _on_socket_writableu   remove_readeru   _tx_buffersu   _produce(   u   selfu   next_consume_on_readableu   error(   u	   __class__(    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _consume·  sL    				c                s5  d } y t t |  ƒ j ƒ  Wn¸ t j k
 r× } z• | j t j k rc t j	 d |  j
 ƒ d } nb | j t j k r‘ t j	 d |  j
 ƒ d } n4 t j d | |  j
 d j t j t j ƒ  Œ  ƒ ƒ ‚  WYd d } ~ Xn& X|  j sý t d t |  j ƒ f ƒ ‚ |  j r| d k	 s't d |  j f ƒ ‚ | rž|  j sX|  j j |  j
 j ƒ  |  j ƒ n  |  j |  _ |  j |  j k r|  j j |  j
 j ƒ  ƒ d |  _ qqÂ|  j sÉ|  j j |  j
 j ƒ  |  j  ƒ n  |  j |  _ |  j rÂ|  j j! |  j
 j ƒ  ƒ d |  _ qÂn¼ |  j |  j k rd|  j j |  j
 j ƒ  ƒ d |  _ |  j |  j k sÂt d |  j f ƒ ‚ n^ |  j |  j k s t d	 d
 |  j d |  j d |  j f ƒ ‚ d |  _ |  j j! |  j
 j ƒ  ƒ |  j s|  j" |  _ |  j j |  j
 j ƒ  |  j  ƒ |  j j# |  j  ƒ n% |  j
 j$ ƒ  r1|  j j# |  j  ƒ n  d S(   uI  [override] Emit data from tx_buffers all chunks are exhausted or
        sending is interrupted by an exception (typically ssl.SSLError with
        SSL_ERROR_WANT_READ/WRITE).

        Update consumer/producer registration.

        :raises Exception: error that signals that connection needs to be
            aborted

        u   SSL emitter wants read: %su   SSL emitter wants write: %sua   _AsyncBaseTransport._produce() failed, aborting connection: error=%r; sock=%s; Caller's stack:
%su    Nu_   _AsyncSSLTransport._produce(): no exception from parent class, but data remains in _tx_buffers.uE   _AsyncSSLTransport._produce(): next_produce_on_writable is still Noneur   _AsyncSSLTransport._produce(): with empty tx_buffers, writable_action cannot be _produce when readable is _produceu   _AsyncSSLTransport._produce(): with empty tx_buffers, expected writable_action as _produce when readable_action is not _produceu   writable_action:u   readable_action:u   state:FT(%   u   Noneu   superu   _AsyncSSLTransportu   _produceu   sslu   SSLErroru   errnou   SSL_ERROR_WANT_READu   _LOGGERu   debugu   _socku   Falseu   SSL_ERROR_WANT_WRITEu   Trueu	   exceptionu   joinu	   tracebacku   format_exceptionu   sysu   exc_infou   _tx_buffersu   AssertionErroru   lenu   _stateu   _ssl_writable_actionu   _nbiou
   set_writeru   filenou   _on_socket_writableu   _ssl_readable_actionu   remove_readeru
   set_readeru   _on_socket_readableu   remove_writeru   _consumeu   add_callback_threadsafeu   pending(   u   selfu   next_produce_on_writableu   error(   u	   __class__(    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _produce   sr    										(
   u   __name__u
   __module__u   __doc__u   __init__u   writeu   _log_exceptionsu   _on_socket_readableu   _on_socket_writableu   _consumeu   _produce(   u
   __locals__(    (   u	   __class__uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   _AsyncSSLTransport[  s   
	Iu   _AsyncSSLTransport((   u   __doc__u   collectionsu   errnou	   functoolsu   loggingu   numbersu   osu   socketu   sslu   sysu	   tracebacku"   pika.adapters.utils.nbio_interfaceu   AbstractIOReferenceu   AbstractStreamTransportu   pika.compatu   pikau   pika.diagnostic_utilsu   EAGAINu   EWOULDBLOCKu   _TRY_IO_AGAIN_SOCK_ERROR_CODESu   EINPROGRESSu(   _CONNECTION_IN_PROGRESS_SOCK_ERROR_CODESu	   getLoggeru   __name__u   _LOGGERu   diagnostic_utilsu   create_log_exception_decoratoru   _log_exceptionsu   check_callback_argu   check_fd_argu   _retry_on_sigintu   objectu   SocketConnectionMixinu   StreamingConnectionMixinu   _AsyncServiceAsyncHandleu   _AsyncSocketConnectoru   _AsyncStreamConnectoru   _AsyncTransportBaseu   _AsyncPlaintextTransportu   _AsyncSSLTransport(    (    (    uJ   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/io_services_utils.pyu   <module>   sB   			)¹ÿ E
	ÿ /„