l
Ì	g]c               @   s  d  Z  d d l Z d d l Z d d l Z d d l Z d d l Z d d l Z d d l m Z e j	 e
 ƒ Z Gd „  d e ƒ Z Gd „  d e ƒ Z Gd „  d	 e ƒ Z Gd
 „  d e ƒ Z Gd „  d e ƒ Z Gd „  d e ƒ Z Gd „  d e ƒ Z Gd „  d e ƒ Z Gd „  d e ƒ Z Gd „  d e ƒ Z Gd „  d e ƒ Z Gd „  d e ƒ Z Gd „  d e j j ƒ Z Gd „  d e ƒ Z d S(    u,  Implements `AMQPConnectionWorkflow` - the default workflow of performing
multiple TCP/[SSL]/AMQP connection attempts with timeouts and retries until one
succeeds or all attempts fail.

Defines the interface `AbstractAMQPConnectionWorkflow` that facilitates
implementing custom connection workflows.

i    N(   u   __version__c             B   s   |  Ee  Z d  Z d S(   u   Base exception for this moduleN(   u   __name__u
   __module__u   __doc__(   u
   __locals__(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   AMQPConnectorException   s   
u   AMQPConnectorExceptionc             B   s   |  Ee  Z d  Z d S(   u:   Overall TCP/[SSL]/AMQP stack connection attempt timed out.N(   u   __name__u
   __module__u   __doc__(   u
   __locals__(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   AMQPConnectorStackTimeout   s   
u   AMQPConnectorStackTimeoutc             B   s   |  Ee  Z d  Z d S(   u    Asynchronous request was abortedN(   u   __name__u
   __module__u   __doc__(   u
   __locals__(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   AMQPConnectorAborted   s   
u   AMQPConnectorAbortedc             B   s   |  Ee  Z d  Z d S(   uj   AMQPConnector operation requested in wrong state, such as aborting after
    completion was reported.
    N(   u   __name__u
   __module__u   __doc__(   u
   __locals__(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   AMQPConnectorWrongState"   s   
u   AMQPConnectorWrongStatec                s,   |  Ee  Z d  Z ‡  f d †  Z d „  Z ‡  S(   uM   Wrapper for exception that occurred during a particular bring-up phase.

    c                s#   t  t |  ƒ j | Œ  | |  _ d S(   uÈ   

        :param BaseException exception: error that occurred while waiting for a
            subclass-specific protocol bring-up phase to complete.
        :param args: args for parent class
        N(   u   superu   AMQPConnectorPhaseErrorBaseu   __init__u	   exception(   u   selfu	   exceptionu   args(   u	   __class__(    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   __init__-   s    c             C   s   d j  |  j j |  j ƒ S(   Nu   {}: {!r}(   u   formatu	   __class__u   __name__u	   exception(   u   self(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   __repr__7   s    (   u   __name__u
   __module__u   __doc__u   __init__u   __repr__(   u
   __locals__(    (   u	   __class__uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   AMQPConnectorPhaseErrorBase(   s   

u   AMQPConnectorPhaseErrorBasec             B   s   |  Ee  Z d  Z d S(   u*   Error connecting TCP socket to remote peerN(   u   __name__u
   __module__u   __doc__(   u
   __locals__(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   AMQPConnectorSocketConnectError;   s   
u   AMQPConnectorSocketConnectErrorc             B   s   |  Ee  Z d  Z d S(   uO   Error setting up transport after TCP connected but before AMQP handshake.

    N(   u   __name__u
   __module__u   __doc__(   u
   __locals__(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu    AMQPConnectorTransportSetupError?   s   
u    AMQPConnectorTransportSetupErrorc             B   s   |  Ee  Z d  Z d S(   u   Error during AMQP handshakeN(   u   __name__u
   __module__u   __doc__(   u
   __locals__(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   AMQPConnectorAMQPHandshakeErrorE   s   
u   AMQPConnectorAMQPHandshakeErrorc             B   s   |  Ee  Z d  Z d S(   u%   AMQP Connection workflow was aborted.N(   u   __name__u
   __module__u   __doc__(   u
   __locals__(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   AMQPConnectionWorkflowAbortedI   s   
u   AMQPConnectionWorkflowAbortedc             B   s   |  Ee  Z d  Z d S(   uu   AMQP Connection Workflow operation requested in wrong state, such as
    aborting after completion was reported.
    N(   u   __name__u
   __module__u   __doc__(   u
   __locals__(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu    AMQPConnectionWorkflowWrongStateM   s   
u    AMQPConnectionWorkflowWrongStatec                s,   |  Ee  Z d  Z ‡  f d †  Z d „  Z ‡  S(   u5   Indicates that AMQP connection workflow failed.

    c                s)   t  t |  ƒ j | Œ  t | ƒ |  _ d S(   u˜   
        :param sequence exceptions: Exceptions that occurred during the
            workflow.
        :param args: args to pass to base class

        N(   u   superu   AMQPConnectionWorkflowFailedu   __init__u   tupleu
   exceptions(   u   selfu
   exceptionsu   args(   u	   __class__(    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   __init__X   s    c             C   sN   d j  |  j j t |  j ƒ |  j d t |  j ƒ d k rG |  j d n d  ƒ S(   NuG   {}: {} exceptions in all; last exception - {!r}; first exception - {!r}i   i    iÿÿÿÿ(   u   formatu	   __class__u   __name__u   lenu
   exceptionsu   None(   u   self(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   __repr__b   s    
(   u   __name__u
   __module__u   __doc__u   __init__u   __repr__(   u
   __locals__(    (   u	   __class__uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   AMQPConnectionWorkflowFailedS   s   

u   AMQPConnectionWorkflowFailedc             B   s¤   |  Ee  Z d  Z d Z d Z d Z d Z d Z d Z d Z	 d „  Z
 d	 „  Z d
 „  Z d „  Z d „  Z d „  Z d „  Z d „  Z d „  Z d „  Z d d „ Z d S(   u;   Performs a single TCP/[SSL]/AMQP connection workflow.

    i    i   i   i   i   i   i   c             C   sj   | |  _  | |  _ d |  _ d |  _ d |  _ d |  _ d |  _ d |  _ d |  _	 d |  _
 |  j |  _ d S(   uª  

        :param callable conn_factory: A function that takes
            `pika.connection.Parameters` as its only arg and returns a brand new
            `pika.connection.Connection`-based adapter instance each time it is
            called. The factory must instantiate the connection with
            `internal_connection_workflow=False`.
        :param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:

        N(   u   _conn_factoryu   _nbiou   Noneu   _addr_recordu   _conn_paramsu   _on_doneu   _tcp_timeout_refu   _stack_timeout_refu	   _task_refu   _socku
   _amqp_connu   _STATE_INITu   _state(   u   selfu   conn_factoryu   nbio(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   __init__w   s    										c             C   s„  |  j  |  j k r- t d j |  j  ƒ ƒ ‚ n  | |  _ | |  _ | |  _ |  j |  _  t j |  j d d … Œ  |  _	 |  j	 j
 t j j t j d ƒ t j j |  j j |  j	 ƒ |  j	 j d ƒ |  j d } t j d t | ƒ |  j j |  j	 | d |  j ƒ|  _ d |  _ |  j j d k	 rA|  j j |  j j |  j ƒ |  _ n  d |  _  |  j j! d k	 r€|  j j |  j j! |  j" ƒ |  _  n  d S(	   u—  Asynchronously perform a single TCP/[SSL]/AMQP connection attempt.

        :param tuple addr_record: a single resolved address record compatible
            with `socket.getaddrinfo()` format.
        :param pika.connection.Parameters conn_params:
        :param callable on_done: Function to call upon completion of the
            workflow: `on_done(pika.connection.Connection | BaseException)`. If
            exception, it's going to be one of the following:
                `AMQPConnectorSocketConnectError`
                `AMQPConnectorTransportSetupError`
                `AMQPConnectorAMQPHandshakeError`
                `AMQPConnectorAborted`

        u)   Already in progress or finished; state={}Ni   i   i   u    Pika version %s connecting to %ru   on_doneF(#   u   _stateu   _STATE_INITu   AMQPConnectorWrongStateu   formatu   _addr_recordu   _conn_paramsu   _on_doneu
   _STATE_TCPu   socketu   _socku
   setsockoptu   pikau   compatu   SOL_TCPu   TCP_NODELAYu   tcp_socket_optsu   set_sock_optsu   tcp_optionsu   setblockingu   Falseu   _LOGu   infou   __version__u   _nbiou   connect_socketu   _on_tcp_connection_doneu	   _task_refu   Noneu   _tcp_timeout_refu   socket_timeoutu
   call_lateru   _on_tcp_connection_timeoutu   _stack_timeout_refu   stack_timeoutu   _on_overall_timeout(   u   selfu   addr_recordu   conn_paramsu   on_doneu   addr(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   start“   s2    			
						c             C   s  |  j  |  j k r! t d ƒ ‚ n  |  j  |  j k rB t d ƒ ‚ n  |  j |  _  |  j ƒ  t j d |  j j	 |  j
 ƒ |  j d
 k rµ t j d ƒ |  j j t j |  j t ƒ  ƒ ƒ nf |  j j sä t j d ƒ |  j j d d ƒ n7 t j d ƒ |  j  |  j k st d	 j |  j  ƒ ƒ ‚ d
 S(   uš  Abort the workflow asynchronously. The completion callback will be
        called with an instance of AMQPConnectorAborted.

        NOTE: we can't cancel/close synchronously because aborting pika
        Connection and its transport requires an asynchronous operation.

        :raises AMQPConnectorWrongState: If called after completion has been
            reported or the workflow not started yet.
        u   Cannot abort before starting.u*   Cannot abort after completion was reporteduC   AMQPConnector: beginning client-initiated asynchronous abort; %r/%suX   AMQPConnector.abort(): no connection, so just scheduling completion report via I/O loop.u*   AMQPConnector.abort(): closing Connection.i@  u3   Client-initiated abort of AMQP Connection Workflow.uC   AMQPConnector.abort(): closing of Connection was already initiated.u9   Connection is closing, but not in TIMEOUT state; state={}N(   u   _stateu   _STATE_INITu   AMQPConnectorWrongStateu   _STATE_DONEu   _STATE_ABORTINGu   _deactivateu   _LOGu   infou   _conn_paramsu   hostu   _addr_recordu
   _amqp_connu   Noneu   debugu   _nbiou   add_callback_threadsafeu	   functoolsu   partialu   _report_completion_and_cleanupu   AMQPConnectorAbortedu
   is_closingu   closeu   _STATE_TIMEOUTu   AssertionErroru   format(   u   self(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   abortÄ   s,    

		c             C   sf   |  j  ƒ  |  j d k	 r2 |  j j ƒ  d |  _ n  d |  _ d |  _ d |  _ d |  _ |  j |  _	 d S(   uq   Cancel asynchronous tasks and clean up to assist garbage collection.

        Transition to STATE_DONE.

        N(
   u   _deactivateu   _socku   Noneu   closeu   _conn_factoryu   _nbiou   _addr_recordu   _on_doneu   _STATE_DONEu   _state(   u   self(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   _closeò   s    
				c             C   s£   |  j  d k s' t d j |  j ƒ ƒ ‚ |  j d k	 rO |  j j ƒ  d |  _ n  |  j d k	 rw |  j j ƒ  d |  _ n  |  j d k	 rŸ |  j j ƒ  d |  _ n  d S(   u$   Cancel asynchronous tasks.

        u:   _deactivate called with self._amqp_conn not None; state={}N(	   u
   _amqp_connu   Noneu   AssertionErroru   formatu   _stateu   _tcp_timeout_refu   cancelu   _stack_timeout_refu	   _task_ref(   u   self(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   _deactivate  s    c             C   sS   t  | t ƒ r" t j d | ƒ n t j d | ƒ |  j } |  j ƒ  | | ƒ d S(   u´   Clean up and invoke client's `on_done` callback.

        :param pika.connection.Connection | BaseException result: value to pass
            to user's `on_done` callback.
        u%   AMQPConnector - reporting failure: %ru%   AMQPConnector - reporting success: %rN(   u
   isinstanceu   BaseExceptionu   _LOGu   erroru   infou   _on_doneu   _close(   u   selfu   resultu   on_done(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   _report_completion_and_cleanup  s    	
c             C   sD   d |  _ t t j d j |  j j |  j ƒ ƒ ƒ } |  j	 | ƒ d S(   ut   Handle TCP connection timeout

        Reports AMQPConnectorSocketConnectError with socket.timeout inside.

        u)   TCP connection attempt timed out: {!r}/{}N(
   u   Noneu   _tcp_timeout_refu   AMQPConnectorSocketConnectErroru   socketu   timeoutu   formatu   _conn_paramsu   hostu   _addr_recordu   _report_completion_and_cleanup(   u   selfu   error(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   _on_tcp_connection_timeout,  s
    	c             C   sD  d |  _ |  j } |  j |  _ | |  j k r¯ d j |  j j |  j t	 |  j j
 ƒ ƒ } t j | ƒ |  j j s‰ t d j |  j ƒ ƒ ‚ |  j j s« |  j j d | ƒ n  d S| |  j k rè t t d j |  j j |  j ƒ ƒ ƒ } nK | |  j k sý t ‚ t t d j |  j j |  j t	 |  j j
 ƒ ƒ ƒ ƒ } |  j | ƒ d S(   uË  Handle overall TCP/[SSL]/AMQP connection attempt timeout by reporting
        `Timeout` error to the client.

        Reports AMQPConnectorSocketConnectError if timeout occurred during
            socket TCP connection attempt.
        Reports AMQPConnectorTransportSetupError if timeout occurred during
            tramsport [SSL] setup attempt.
        Reports AMQPConnectorAMQPHandshakeError if timeout occurred during
            AMQP handshake.

        u0   Timeout while setting up AMQP to {!r}/{}; ssl={}u   Unexpected open state of {!r}i@  Nu*   Timeout while connecting socket to {!r}/{}u5   Timeout while setting up transport to {!r}/{}; ssl={}(   u   Noneu   _stack_timeout_refu   _stateu   _STATE_TIMEOUTu   _STATE_AMQPu   formatu   _conn_paramsu   hostu   _addr_recordu   boolu   ssl_optionsu   _LOGu   erroru
   _amqp_connu   is_openu   AssertionErroru
   is_closingu   closeu
   _STATE_TCPu   AMQPConnectorSocketConnectErroru   AMQPConnectorStackTimeoutu   _STATE_TRANSPORTu    AMQPConnectorTransportSetupErroru   _report_completion_and_cleanup(   u   selfu
   prev_stateu   msgu   error(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   _on_overall_timeout9  s2    		c             C   s6  d |  _ |  j d k	 r1 |  j j ƒ  d |  _ n  | d k	 rj t j d | |  j ƒ |  j t | ƒ ƒ d St j	 d |  j
 ƒ |  j |  _ d } } |  j j d k	 rá |  j j j } |  j j j } | d k rá |  j j } qá n  |  j j d t j |  j |  j ƒ d |  j
 d | d | d |  j ƒ |  _ d |  _
 d S(	   u  Handle completion of asynchronous socket connection attempt.

        Reports AMQPConnectorSocketConnectError if TCP socket connection
            failed.

        :param None|BaseException exc: None on success; exception object on
            failure

        u*   TCP Connection attempt failed: %r; dest=%rNu)   TCP connection to broker established: %r.u   protocol_factoryu   socku   ssl_contextu   server_hostnameu   on_done(   u   Noneu	   _task_refu   _tcp_timeout_refu   cancelu   _LOGu   erroru   _addr_recordu   _report_completion_and_cleanupu   AMQPConnectorSocketConnectErroru   debugu   _socku   _STATE_TRANSPORTu   _stateu   _conn_paramsu   ssl_optionsu   contextu   server_hostnameu   hostu   _nbiou   create_streaming_connectionu	   functoolsu   partialu   _conn_factoryu    _on_transport_establishment_done(   u   selfu   excu   ssl_contextu   server_hostname(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   _on_tcp_connection_doneg  s4    
	

	c             C   s¸   d |  _ t | t ƒ r] t j d | |  j j |  j t	 |  j j
 ƒ ƒ |  j t | ƒ ƒ d St j d | ƒ | \ } |  _ |  j |  _ |  j j |  j d d ƒ|  j j |  j ƒ d S(   uQ  Handle asynchronous completion of
        `AbstractIOServices.create_streaming_connection()`

        Reports AMQPConnectorTransportSetupError if transport ([SSL]) setup
            failed.

        :param sequence|BaseException result: On success, a two-tuple
            (transport, protocol); on failure, exception instance.

        uC   Attempt to create the streaming transport failed: %r; %r/%s; ssl=%sNu"   Streaming transport linked up: %r.u   remove_defaultT(   u   Noneu	   _task_refu
   isinstanceu   BaseExceptionu   _LOGu   erroru   _conn_paramsu   hostu   _addr_recordu   boolu   ssl_optionsu   _report_completion_and_cleanupu    AMQPConnectorTransportSetupErroru   infou
   _amqp_connu   _STATE_AMQPu   _stateu   add_on_open_error_callbacku   _on_amqp_handshake_doneu   Trueu   add_on_open_callback(   u   selfu   resultu
   _transport(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu    _on_transport_establishment_done”  s    		c             C   s9  t  j d |  j | |  j j |  j ƒ d |  _ |  j |  j k rL t	 ƒ  } nÜ |  j |  j
 k r— t t d j |  j j |  j t |  j j ƒ ƒ ƒ ƒ } n‘ |  j |  j k r| d k rÝ t  j d |  j j |  j | ƒ | } q(t  j d |  j j |  j | ƒ t | ƒ } n t  j d |  j | | ƒ d S|  j | ƒ d S(   uµ  Handle completion of AMQP connection handshake attempt.

        NOTE: we handle two types of callbacks - success with just connection
        arg as well as the open-error callback with connection and error

        Reports AMQPConnectorAMQPHandshakeError if AMQP handshake failed.

        :param pika.connection.Connection connection:
        :param BaseException | None error: None on success, otherwise
            failure

        uJ   AMQPConnector: AMQP handshake attempt completed; state=%s; error=%r; %r/%su,   Timeout during AMQP handshake{!r}/{}; ssl={}u8   AMQPConnector: AMQP connection established for %r/%s: %ru=   AMQPConnector: AMQP connection handshake failed for %r/%s: %rug   AMQPConnector: Ignoring AMQP handshake completion notification due to wrong state=%s; error=%r; conn=%rN(   u   _LOGu   debugu   _stateu   _conn_paramsu   hostu   _addr_recordu   Noneu
   _amqp_connu   _STATE_ABORTINGu   AMQPConnectorAbortedu   _STATE_TIMEOUTu   AMQPConnectorAMQPHandshakeErroru   AMQPConnectorStackTimeoutu   formatu   boolu   ssl_optionsu   _STATE_AMQPu   _report_completion_and_cleanup(   u   selfu
   connectionu   erroru   result(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   _on_amqp_handshake_done¶  s:    
		N(   u   __name__u
   __module__u   __doc__u   _STATE_INITu
   _STATE_TCPu   _STATE_TRANSPORTu   _STATE_AMQPu   _STATE_TIMEOUTu   _STATE_ABORTINGu   _STATE_DONEu   __init__u   startu   abortu   _closeu   _deactivateu   _report_completion_and_cleanupu   _on_tcp_connection_timeoutu   _on_overall_timeoutu   _on_tcp_connection_doneu    _on_transport_establishment_doneu   Noneu   _on_amqp_handshake_done(   u
   __locals__(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   AMQPConnectorj   s&   
		1	.					.	-	"u   AMQPConnectorc             B   s&   |  Ee  Z d  Z d „  Z d „  Z d S(   uM   Interface for implementing a custom TCP/[SSL]/AMQP connection workflow.

    c             C   s
   t  ‚ d S(   u¦  Asynchronously perform the workflow until success or all retries
        are exhausted. Called by the adapter.

        :param sequence connection_configs: A sequence of one or more
            `pika.connection.Parameters`-based objects. Will attempt to connect
            using each config in the given order.
        :param callable connector_factory: call it without args to obtain a new
            instance of `AMQPConnector` for each connection attempt.
            See `AMQPConnector` for details.
        :param native_loop: Native I/O loop passed by app to the adapter or
            obtained by the adapter by default.
        :param callable on_done: Function to call upon completion of the
            workflow:
            `on_done(pika.connection.Connection |
                     AMQPConnectionWorkflowFailed |
                     AMQPConnectionWorkflowAborted)`.
            `Connection`-based adapter on success,
            `AMQPConnectionWorkflowFailed` on failure,
            `AMQPConnectionWorkflowAborted` if workflow was aborted.

        :raises AMQPConnectionWorkflowWrongState: If called in wrong state, such
            as after starting the workflow.
        N(   u   NotImplementedError(   u   selfu   connection_configsu   connector_factoryu   native_loopu   on_done(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   startñ  s    c             C   s
   t  ‚ d S(   u·  Abort the workflow asynchronously. The completion callback will be
        called with an instance of AMQPConnectionWorkflowAborted.

        NOTE: we can't cancel/close synchronously because aborting pika
        Connection and its transport requires an asynchronous operation.

        :raises AMQPConnectionWorkflowWrongState: If called in wrong state, such
            as before starting or after completion has been reported.
        N(   u   NotImplementedError(   u   self(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   abort  s    
N(   u   __name__u
   __module__u   __doc__u   startu   abort(   u
   __locals__(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   AbstractAMQPConnectionWorkflowì  s   
	u   AbstractAMQPConnectionWorkflowc             B   s­   |  Ee  Z d  Z e j Z e j Z d Z d Z	 d Z
 d Z d d „ Z d „  Z d „  Z d „  Z d	 „  Z d
 „  Z d „  Z d „  Z d „  Z d „  Z d „  Z d „  Z d S(   u  Implements Pika's default workflow for performing multiple TCP/[SSL]/AMQP
    connection attempts with timeouts and retries until one succeeds or all
    attempts fail.

    The workflow:
        while not success and retries remain:
            1. For each given config (pika.connection.Parameters object):
                A. Perform DNS resolution of the config's host.
                B. Attempt to establish TCP/[SSL]/AMQP for each resolved address
                   until one succeeds, in which case we're done.
            2. If all configs failed but retries remain, resume from beginning
               after the given retry pause. NOTE: failure of DNS resolution
               is equivalent to one cycle and will be retried after the pause
               if retries remain.

    i    i   i   i   c             C   s|   d |  _ d |  _ | |  _ d |  _ d |  _ d |  _ d |  _ d |  _ d |  _	 d |  _
 d |  _ g  |  _ |  j |  _ d S(   uã  
        :param int | float retry_pause: Non-negative number of seconds to wait
            before retrying the config sequence. Meaningful only if retries is
            greater than 0. Defaults to 2 seconds.
        :param bool _until_first_amqp_attempt: INTERNAL USE ONLY; ends workflow
            after first AMQP handshake attempt, regardless of outcome (success
            or failure). The automatic connection logic in
            `pika.connection.Connection` enables this because it's not
            designed/tested to reset all state properly to handle more than one
            AMQP handshake attempt.

        TODO: Do we need getaddrinfo timeout?
        TODO: Would it be useful to implement exponential back-off?

        N(   u   Noneu   _attempts_remainingu   _retry_pauseu   _until_first_amqp_attemptu   _nbiou   _current_config_indexu   _connection_configsu   _connector_factoryu   _on_doneu
   _connectoru	   _task_refu   _addrinfo_iteru   _connection_errorsu   _STATE_INITu   _state(   u   selfu   _until_first_amqp_attempt(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   __init__3  s    												c             C   s   | |  _  d S(   uî  Called by the conneciton adapter only on pika's
        `AMQPConnectionWorkflow` instance to provide it the adapter-specific
        `AbstractIOServices` object before calling the `start()` method.

        NOTE: Custom workflow implementations should use the native I/O loop
        directly because `AbstractIOServices` is private to Pika
        implementation and its interface may change without notice.

        :param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:

        N(   u   _nbio(   u   selfu   nbio(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   set_io_services]  s    c             C   s  |  j  |  j k r- t d j |  j  ƒ ƒ ‚ n  y t | ƒ Wn: t k
 rw } z t d j | ƒ ƒ ‚ WYd d } ~ Xn X| s– t d j | ƒ ƒ ‚ n  | |  _ | |  _	 | |  _
 | d	 j |  _ | d
 j |  _ |  j |  _  t j d ƒ |  j j d t j |  j d d ƒƒ |  _ d S(   uf  Override `AbstractAMQPConnectionWorkflow.start()`.

        NOTE: This implementation uses `connection_attempts` and `retry_delay`
        values from the last element of the given `connection_configs` sequence
        as the overall number of connection attempts of the entire
        `connection_configs` sequence and pause between each sequence.

        u)   Already in progress or finished; state={}u3   connection_configs does not support iteration: {!r}Nu"   connection_configs is empty: {!r}.i   u1   Starting AMQP Connection workflow asynchronously.i    u   firstiÿÿÿÿiÿÿÿÿT(   u   _stateu   _STATE_INITu   AMQPConnectorWrongStateu   formatu   iteru	   Exceptionu	   TypeErroru
   ValueErroru   _connection_configsu   _connector_factoryu   _on_doneu   connection_attemptsu   _attempts_remainingu   retry_delayu   _retry_pauseu   _STATE_ACTIVEu   _LOGu   debugu   _nbiou
   call_lateru	   functoolsu   partialu   _start_new_cycle_asyncu   Trueu	   _task_ref(   u   selfu   connection_configsu   connector_factoryu   native_loopu   on_doneu   error(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   startk  s*    				c             C   sÄ   |  j  |  j k r! t d ƒ ‚ n! |  j  |  j k rB t d ƒ ‚ n  |  j |  _  |  j ƒ  t j d ƒ |  j d k r¦ t j
 d ƒ |  j j t j |  j t ƒ  ƒ ƒ n t j
 d ƒ |  j j ƒ  d S(   u<   Override `AbstractAMQPConnectionWorkflow.abort()`.

        u   Cannot abort before starting.u*   Cannot abort after completion was reporteduF   AMQPConnectionWorkflow: beginning client-initiated asynchronous abort.u`   AMQPConnectionWorkflow.abort(): no connector, so just scheduling completion report via I/O loop.u=   AMQPConnectionWorkflow.abort(): requesting connector.abort().N(   u   _stateu   _STATE_INITu   AMQPConnectorWrongStateu   _STATE_DONEu   _STATE_ABORTINGu   _deactivateu   _LOGu   infou
   _connectoru   Noneu   debugu   _nbiou   add_callback_threadsafeu	   functoolsu   partialu   _report_completion_and_cleanupu   AMQPConnectionWorkflowAbortedu   abort(   u   self(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   abort—  s    
	c             C   sY   |  j  ƒ  d |  _ d |  _ d |  _ d |  _ d |  _ d |  _ d |  _ |  j	 |  _
 d S(   ur   Cancel asynchronous tasks and clean up to assist garbage collection.

        Transition to _STATE_DONE.

        N(   u   _deactivateu   Noneu   _connection_configsu   _nbiou   _connector_factoryu   _on_doneu
   _connectoru   _addrinfo_iteru   _connection_errorsu   _STATE_DONEu   _state(   u   self(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   _close²  s    
							c             C   s,   |  j  d k	 r( |  j  j ƒ  d |  _  n  d S(   u$   Cancel asynchronous tasks.

        N(   u	   _task_refu   Noneu   cancel(   u   self(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   _deactivateÄ  s    c             C   sS   t  | t ƒ r" t j d | ƒ n t j d | ƒ |  j } |  j ƒ  | | ƒ d S(   uÃ   Clean up and invoke client's `on_done` callback.

        :param pika.connection.Connection | AMQPConnectionWorkflowFailed result:
            value to pass to user's `on_done` callback.
        u.   AMQPConnectionWorkflow - reporting failure: %ru.   AMQPConnectionWorkflow - reporting success: %rN(   u
   isinstanceu   BaseExceptionu   _LOGu   erroru   infou   _on_doneu   _close(   u   selfu   resultu   on_done(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   _report_completion_and_cleanupÌ  s    	
c             C   s¿   d |  _ |  j d k s' t |  j ƒ ‚ |  j d k rf t |  j ƒ } t j d | ƒ |  j | ƒ d S|  j d 8_ t j	 d |  j ƒ d |  _
 |  j j | r¦ d n |  j |  j ƒ |  _ d S(   uQ  Start a new workflow cycle (if any more attempts are left) beginning
        with the first Parameters object in self._connection_configs. If out of
        attempts, report `AMQPConnectionWorkflowFailed`.

        :param bool first: if True, don't delay; otherwise delay next attempt by
            `self._retry_pause` seconds.
        i    u$   AMQP connection workflow failed: %r.Ni   uQ   Beginning a new AMQP connection workflow cycle; attempts remaining after this: %s(   u   Noneu	   _task_refu   _attempts_remainingu   AssertionErroru   AMQPConnectionWorkflowFailedu   _connection_errorsu   _LOGu   erroru   _report_completion_and_cleanupu   debugu   _current_config_indexu   _nbiou
   call_lateru   _retry_pauseu   _try_next_config_async(   u   selfu   firstu   error(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   _start_new_cycle_asyncÜ  s    	
		c             C   sí   d |  _ |  j d k r$ d |  _ n |  j d 7_ |  j t |  j ƒ k rl t j d ƒ |  j d d ƒ d S|  j |  j } t j d | j	 | j
 ƒ |  j d k sª t ‚ |  j j d | j	 d | j
 d	 |  j d
 |  j d |  j ƒ |  _ d S(   uw   Attempt to connect using the next Parameters config. If there are no
        more configs, start a new cycle.

        i    i   u-   _try_next_config_async: starting a new cycle.u   firstNu   _try_next_config_async: %r:%su   hostu   portu   socktypeu   protou   on_doneF(   u   Noneu	   _task_refu   _current_config_indexu   lenu   _connection_configsu   _LOGu   debugu   _start_new_cycle_asyncu   Falseu   hostu   portu   AssertionErroru   _nbiou   getaddrinfou
   _SOCK_TYPEu   _IPPROTOu   _on_getaddrinfo_async_done(   u   selfu   params(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   _try_next_config_asyncø  s"    					c             C   s   d |  _ t | t ƒ rL t j d | ƒ |  j j | ƒ |  j d d ƒ d St j
 d t | ƒ ƒ t | ƒ |  _ |  j ƒ  d S(   uä   Handles completion callback from asynchronous `getaddrinfo()`.

        :param list | BaseException addrinfos_or_exc: resolved address records
            returned by `getaddrinfo()` or an exception object from failure.
        u   getaddrinfo failed: %r.u   firstNu   getaddrinfo returned %s recordsF(   u   Noneu	   _task_refu
   isinstanceu   BaseExceptionu   _LOGu   erroru   _connection_errorsu   appendu   _start_new_cycle_asyncu   Falseu   debugu   lenu   iteru   _addrinfo_iteru   _try_next_resolved_address(   u   selfu   addrinfos_or_exc(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   _on_getaddrinfo_async_done  s    	c             C   s’   y t  |  j ƒ } Wn- t k
 rB t j d ƒ |  j ƒ  d SYn Xt j d | ƒ |  j ƒ  |  _ |  j j d | d |  j	 |  j
 d |  j ƒ d S(   u}   Try connecting using next resolved address. If there aren't any left,
        continue with next Parameters config.

        u8   _try_next_resolved_address: continuing with next config.Nu-   Attempting to connect using address record %ru   addr_recordu   conn_paramsu   on_done(   u   nextu   _addrinfo_iteru   StopIterationu   _LOGu   debugu   _try_next_config_asyncu   _connector_factoryu
   _connectoru   startu   _connection_configsu   _current_config_indexu   _on_connector_done(   u   selfu   addr_record(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   _try_next_resolved_address)  s    
	c             C   s  d |  _ t j d | ƒ t | t ƒ rö |  j j | ƒ t | t ƒ r„ |  j	 |  j
 k sq t d j |  j	 ƒ ƒ ‚ |  j t ƒ  ƒ q|  j ré t | t ƒ ré t j d ƒ t | j t j j ƒ rÊ t } n t |  j ƒ } |  j | ƒ q|  j ƒ  n |  j | ƒ d S(   uÌ   Handle completion of connection attempt by `AMQPConnector`.

        :param pika.connection.Connection | BaseException conn_or_exc: See
            `AMQPConnector.start()` for exception details.

        u$   Connection attempt completed with %ru&   Expected _STATE_ABORTING, but got {!r}uc   Ending AMQP connection workflow after first failed AMQP handshake due to _until_first_amqp_attempt.N(   u   Noneu
   _connectoru   _LOGu   debugu
   isinstanceu   BaseExceptionu   _connection_errorsu   appendu   AMQPConnectorAbortedu   _stateu   _STATE_ABORTINGu   AssertionErroru   formatu   _report_completion_and_cleanupu   AMQPConnectionWorkflowAbortedu   _until_first_amqp_attemptu   AMQPConnectorAMQPHandshakeErroru	   exceptionu   pikau
   exceptionsu   ConnectionOpenAbortedu   AMQPConnectionWorkflowFailedu   _try_next_resolved_address(   u   selfu   conn_or_excu   error(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   _on_connector_done?  s(    				NF(   u   __name__u
   __module__u   __doc__u   socketu   SOCK_STREAMu
   _SOCK_TYPEu   IPPROTO_TCPu   _IPPROTOu   _STATE_INITu   _STATE_ACTIVEu   _STATE_ABORTINGu   _STATE_DONEu   Falseu   __init__u   set_io_servicesu   startu   abortu   _closeu   _deactivateu   _report_completion_and_cleanupu   _start_new_cycle_asyncu   _try_next_config_asyncu   _on_getaddrinfo_async_doneu   _try_next_resolved_addressu   _on_connector_done(   u
   __locals__(    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   AMQPConnectionWorkflow  s&   
		*		,								u   AMQPConnectionWorkflow(   u   __doc__u	   functoolsu   loggingu   socketu   pika.compatu   pikau   pika.exceptionsu   pika.tcp_socket_optsu   __version__u	   getLoggeru   __name__u   _LOGu	   Exceptionu   AMQPConnectorExceptionu   AMQPConnectorStackTimeoutu   AMQPConnectorAbortedu   AMQPConnectorWrongStateu   AMQPConnectorPhaseErrorBaseu   AMQPConnectorSocketConnectErroru    AMQPConnectorTransportSetupErroru   AMQPConnectorAMQPHandshakeErroru   AMQPConnectionWorkflowAbortedu    AMQPConnectionWorkflowWrongStateu   AMQPConnectionWorkflowFailedu   objectu   AMQPConnectoru   compatu   AbstractBaseu   AbstractAMQPConnectionWorkflowu   AMQPConnectionWorkflow(    (    (    uL   /srv/kernel/kteam-tools/dashboard/pika/adapters/utils/connection_workflow.pyu   <module>   s.   ÿ ƒ-