o
    >h                     @   s  U d Z ddlmZ ddlmZ ddlmZ ddlmZm	Z	 ddl
mZ ddlmZmZmZ ddlmZ dd	lmZmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ dZee ed< edr~ddl m!Z! ddl"mZm#Z#m$Z$ ddl%m&Z& ddl'm(Z( nG dd dZ#G dd dZ$G dd de$j)Z*G dd de$j)Z+G dd de$j)Z,G dd  d e#j-Z.eeG d!d" d"Z/eeG d#d$ d$Z0eeG d%d& d&Z1eeG d'd( d(Z2G d)d* d*ej3Z4G d+d, d,ej3Z5G d-d. d.ej3Z6G d/d0 d0ej3Z7dS )1zT
Tests for the implementation of the ssh-userauth service.

Maintainer: Paul Swartz
    )
ModuleType)Optional)implementer)
ConchErrorValidPublicKey)ICredentialsChecker)
IAnonymousISSHPrivateKeyIUsernamePassword)UnauthorizedLogin)IRealmPortal)defertask)loopback)requireModule)unittestNkeyscryptography)SSHProtocolChecker)r   	transportuserauth)NS)keydatac                   @      e Zd ZG dd dZdS )r   c                   @      e Zd ZdZdS )ztransport.SSHTransportBaseQ
            A stub class so that later class definitions won't die.
            N__name__
__module____qualname____doc__ r"   r"   /var/www/vedio/testing/chatpythonscript.ninositsolution.com/env/lib/python3.10/site-packages/twisted/conch/test/test_userauth.pySSHTransportBase"       r$   N)r   r   r    r$   r"   r"   r"   r#   r   !       r   c                   @   r   )r   c                   @   r   )zuserauth.SSHUserAuthClientr   Nr   r"   r"   r"   r#   SSHUserAuthClient(   r%   r'   N)r   r   r    r'   r"   r"   r"   r#   r   '   r&   r   c                   @   s2   e Zd ZdZdd Zdd ZdddZd	d
 ZdS )ClientUserAuthz"
    A mock user auth client.
    c                 C   s(   | j r
tjtjS ttjtjS )z
        If this is the first time we've been called, return a blob for
        the DSA key.  Otherwise, return a blob
        for the RSA key.
        )	lastPublicKeyr   Key
fromStringr   publicRSA_opensshr   succeedpublicDSA_opensshselfr"   r"   r#   getPublicKey3   s   zClientUserAuth.getPublicKeyc                 C   s   t tjtjS )z@
        Return the private key object for the RSA key.
        )r   r-   r   r*   r+   r   privateRSA_opensshr/   r"   r"   r#   getPrivateKey>      zClientUserAuth.getPrivateKeyNc                 C   
   t dS )z/
        Return 'foo' as the password.
           foor   r-   )r0   promptr"   r"   r#   getPasswordD      
zClientUserAuth.getPasswordc                 C   r5   )z>
        Return 'foo' as the answer to two questions.
        )foor;   r7   )r0   nameinformationanswersr"   r"   r#   getGenericAnswersJ   r:   z ClientUserAuth.getGenericAnswersN)r   r   r    r!   r1   r3   r9   r?   r"   r"   r"   r#   r(   .   s    
r(   c                   @       e Zd ZdZdd Zdd ZdS )OldClientAuthz~
    The old SSHUserAuthClient returned a cryptography key object from
    getPrivateKey() and a string from getPublicKey
    c                 C   s   t tjtjjS r@   )r   r-   r   r*   r+   r   r2   	keyObjectr/   r"   r"   r#   r3   W      zOldClientAuth.getPrivateKeyc                 C   s   t jtj S r@   )r   r*   r+   r   r,   blobr/   r"   r"   r#   r1   Z   s   zOldClientAuth.getPublicKeyNr   r   r    r!   r3   r1   r"   r"   r"   r#   rB   Q   s    rB   c                   @   rA   )ClientAuthWithoutPrivateKeyzP
    This client doesn't have a private key, but it does have a public key.
    c                 C      d S r@   r"   r/   r"   r"   r#   r3   c      z)ClientAuthWithoutPrivateKey.getPrivateKeyc                 C   s   t jtjS r@   )r   r*   r+   r   r,   r/   r"   r"   r#   r1   f      z(ClientAuthWithoutPrivateKey.getPublicKeyNrF   r"   r"   r"   r#   rG   ^   s    rG   c                   @   sL   e Zd ZdZG dd dZG dd dZdd Zdd	 Zd
d Zdd Z	dS )FakeTransporta_  
    L{userauth.SSHUserAuthServer} expects an SSH transport which has a factory
    attribute which has a portal attribute. Because the portal is important for
    testing authentication, we need to be able to provide an interesting portal
    object to the L{SSHUserAuthServer}.

    In addition, we want to be able to capture any packets sent over the
    transport.

    @ivar packets: a list of 2-tuples: (messageType, data).  Each 2-tuple is
        a sent packet.
    @type packets: C{list}
    @param lostConnecion: True if loseConnection has been called on us.
    @type lostConnection: L{bool}
    c                   @   s   e Zd ZdZdZdd ZdS )zFakeTransport.ServicezW
        A mock service, representing the other service offered by the server.
           nancyc                 C   rH   r@   r"   r/   r"   r"   r#   serviceStarted   rI   z$FakeTransport.Service.serviceStartedN)r   r   r    r!   r<   rM   r"   r"   r"   r#   Service{   s    rN   c                   @      e Zd ZdZdd ZdS )zFakeTransport.Factoryzg
        A mock factory, representing the factory that spawned this user auth
        service.
        c                 C   s   |dkrt jS dS )z2
            Return our fake service.
               noneN)rK   rN   )r0   r   servicer"   r"   r#   
getService   s   z FakeTransport.Factory.getServiceN)r   r   r    r!   rR   r"   r"   r"   r#   Factory   s    rS   c                 C   s(   |   | _|| j_d| _| | _g | _d S NF)rS   factoryportallostConnectionr   packets)r0   rV   r"   r"   r#   __init__   s
   

zFakeTransport.__init__c                 C   s   | j ||f dS )z8
        Record the packet sent by the service.
        N)rX   append)r0   messageTypemessager"   r"   r#   
sendPacket   r4   zFakeTransport.sendPacketc                 C      dS )z
        Pretend that this transport encrypts traffic in both directions. The
        SSHUserAuthServer disables password authentication if the transport
        isn't encrypted.
        Tr"   )r0   	directionr"   r"   r#   isEncrypted   s   zFakeTransport.isEncryptedc                 C   s
   d| _ d S NT)rW   r/   r"   r"   r#   loseConnection   s   
zFakeTransport.loseConnectionN)
r   r   r    r!   rN   rS   rY   r]   r`   rb   r"   r"   r"   r#   rK   j   s    
rK   c                   @   rO   )Realmz
    A mock realm for testing L{userauth.SSHUserAuthServer}.

    This realm is not actually used in the course of testing, so it returns the
    simplest thing that could possibly work.
    c                 G   s   t |d d dd fS )Nr   c                   S   rH   r@   r"   r"   r"   r"   r#   <lambda>       z%Realm.requestAvatar.<locals>.<lambda>r7   )r0   avatarIdmind
interfacesr"   r"   r#   requestAvatar   s   zRealm.requestAvatarN)r   r   r    r!   ri   r"   r"   r"   r#   rc      s    rc   c                   @      e Zd ZdZefZdd ZdS )PasswordCheckerz
    A very simple username/password checker which authenticates anyone whose
    password matches their username and rejects all others.
    c                 C   s&   |j |jkrt|j S ttdS )NzInvalid username/password pair)usernamepasswordr   r-   failr   )r0   credsr"   r"   r#   requestAvatarId   s   zPasswordChecker.requestAvatarIdN)r   r   r    r!   r
   credentialInterfacesrp   r"   r"   r"   r#   rk          rk   c                   @   rj   )PrivateKeyCheckerz
    A very simple public key checker which authenticates anyone whose
    public/private keypair is the same keydata.public/privateRSA_openssh.
    c                 C   sX   |j tjtj  kr)|jd ur&tj|j }||j|jr#|j	S t t
 t r@   )rE   r   r*   r+   r   r,   	signatureverifysigDatarl   r   r   )r0   ro   objr"   r"   r#   rp      s   
z!PrivateKeyChecker.requestAvatarIdN)r   r   r    r!   r	   rq   rp   r"   r"   r"   r#   rs      rr   rs   c                   @   rj   )AnonymousCheckerzI
    A simple checker which isn't supported by L{SSHUserAuthServer}.
    c                 C   rH   r@   r"   )r0   credentialsr"   r"   r#   rp      s   z AnonymousChecker.requestAvatarIdN)r   r   r    r!   r   rq   rp   r"   r"   r"   r#   rx      s    rx   c                   @   s   e Zd ZdZedu rdZdd Zdd Zdd	 Zd
d Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! Zd"d# Zd$d% Zd&d' Zd(d) Zd*d+ ZdS ),SSHUserAuthServerTestsz&
    Tests for SSHUserAuthServer.
    Ncannot run without cryptographyc                 C   sb   t  | _t| j| _| jt  | jt  t | _	t
| j| j	_| j	  | j	j  d S r@   )rc   realmr   rV   registerCheckerrk   rs   r   SSHUserAuthServer
authServerrK   r   rM   supportedAuthenticationssortr/   r"   r"   r#   setUp   s   

zSSHUserAuthServerTests.setUpc                 C      | j   d | _ d S r@   )r   serviceStoppedr/   r"   r"   r#   tearDown      

zSSHUserAuthServerTests.tearDownc                 C   s(   |  | jjjd tjtdd f dS )z;
        Check that the authentication has failed.
        s   password,publickey    N)assertEqualr   r   rX   r   MSG_USERAUTH_FAILUREr   r0   ignoredr"   r"   r#   _checkFailed   s   z#SSHUserAuthServerTests._checkFailedc                 C   s,   | j tdtd td }|| jS )z
        A client may request a list of authentication 'method name' values
        that may continue by using the "none" authentication 'method name'.

        See RFC 4252 Section 5.2.
        r6   s   servicerP   )r   ssh_USERAUTH_REQUESTr   addCallbackr   )r0   dr"   r"   r#   test_noneAuthentication  s   z.SSHUserAuthServerTests.test_noneAuthenticationc                    sF   d tdtdtddtdg} j|} fdd}||S )z
        When provided with correct password authentication information, the
        server should respond by sending a MSG_USERAUTH_SUCCESS message with
        no other data.

        See RFC 4252, Section 5.1.
            r6   rP      passwordr   c                          jjjtjdfg d S Nr   r   r   r   rX   r   MSG_USERAUTH_SUCCESSr   r/   r"   r#   check     
zKSSHUserAuthServerTests.test_successfulPasswordAuthentication.<locals>.check)joinr   r   r   r   r0   packetr   r   r"   r/   r#   %test_successfulPasswordAuthentication  s   $
z<SSHUserAuthServerTests.test_successfulPasswordAuthenticationc                 C   sh   d tdtdtddtdg}t | j_| j|}| | jjj	g  | jj
d || jS )a;  
        When provided with invalid authentication details, the server should
        respond by sending a MSG_USERAUTH_FAILURE message which states whether
        the authentication was partially successful, and provides other, open
        options for authentication.

        See RFC 4252, Section 5.1.
        r   r6   rP   r   r      bar   )r   r   r   Clockr   clockr   r   r   rX   advancer   r   r0   r   r   r"   r"   r#   !test_failedPasswordAuthentication'  s   $
z8SSHUserAuthServerTests.test_failedPasswordAuthenticationc                    s   t jtj }t jtj}tdtd td d t|  t| }d j	j
_|tdttjf | }|t|7 } j	|} fdd}||S )zN
        Test that private key authentication completes successfully,
        r6   rP   	   publickey      testc                    r   r   r   r   r/   r"   r#   r   M  r   zMSSHUserAuthServerTests.test_successfulPrivateKeyAuthentication.<locals>.check)r   r*   r+   r   r,   rE   r2   r   sshTyper   r   	sessionIDsignbytesr   MSG_USERAUTH_REQUESTr   r   )r0   rE   rw   r   rt   r   r   r"   r/   r#   'test_successfulPrivateKeyAuthentication8  s,   


z>SSHUserAuthServerTests.test_successfulPrivateKeyAuthenticationc                    s   t   dd }dd } fdd}| | jd| | | jd| | | jd	| td
td td td }| j| |  tS )z
        ssh_USERAUTH_REQUEST should raise a ConchError if tryAuth returns
        None. Added to catch a bug noticed by pyflakes.
        c                 S   s   |  d d S )Nz&request should have raised ConochError)rn   r   r"   r"   r#   mockCbFinishedAuth\  rJ   zOSSHUserAuthServerTests.test_requestRaisesConchError.<locals>.mockCbFinishedAuthc                 S   rH   r@   r"   )kinduserdatar"   r"   r#   mockTryAuth_  rI   zHSSHUserAuthServerTests.test_requestRaisesConchError.<locals>.mockTryAuthc                    s     | j d S r@   )errbackvalue)reasonr   r"   r#   mockEbBadAuthb  s   zJSSHUserAuthServerTests.test_requestRaisesConchError.<locals>.mockEbBadAuthtryAuth_cbFinishedAuth
_ebBadAuths   userrP   s
   public-keys   data)r   Deferredpatchr   r   r   assertFailurer   )r0   r   r   r   r   r"   r   r#   test_requestRaisesConchErrorU  s    z3SSHUserAuthServerTests.test_requestRaisesConchErrorc                    sb   t jtj  tdtd td d td t  }j|} fdd}|	|S )z@
        Test that verifying a valid private key works.
        r6   rP   r   r      ssh-rsac                    s*    jjjtjtdt  fg d S )Nr   )r   r   r   rX   r   MSG_USERAUTH_PK_OKr   r   rE   r0   r"   r#   r   ~  s   z@SSHUserAuthServerTests.test_verifyValidPrivateKey.<locals>.check)
r   r*   r+   r   r,   rE   r   r   r   r   r   r"   r   r#   test_verifyValidPrivateKeyo  s    
z1SSHUserAuthServerTests.test_verifyValidPrivateKeyc                 C   sV   t jtj }tdtd td d td t| }| j|}|	| j
S )d
        Test that private key authentication fails when the public key
        is invalid.
        r6   rP   r   r   s   ssh-dsar   r*   r+   r   r.   rE   r   r   r   r   r   r0   rE   r   r   r"   r"   r#   3test_failedPrivateKeyAuthenticationWithoutSignature  s   zJSSHUserAuthServerTests.test_failedPrivateKeyAuthenticationWithoutSignaturec                 C   s|   t jtj }t jtj}tdtd td d td t| t|| }d| j	j
_| j	|}|| jS )r   r6   rP   r   r   r   r   )r   r*   r+   r   r,   rE   r2   r   r   r   r   r   r   r   r   )r0   rE   rw   r   r   r"   r"   r#   0test_failedPrivateKeyAuthenticationWithSignature  s&   
	zGSSHUserAuthServerTests.test_failedPrivateKeyAuthenticationWithSignaturec                 C   sj   t jtj }td|dd  }tdtd td d td t| }| j|}|	| j
S )	z
        Private key authentication fails when the public key type is
        unsupported or the public key is corrupt.
        s   ssh-bad-type   Nr6   rP   r   r   r   r   r   r"   r"   r#   test_unsupported_publickey  s    z1SSHUserAuthServerTests.test_unsupported_publickeyc                 C   sR   t  }t| j|_| jt  |  |  |j	
  | |j	ddg dS )ah  
        L{SSHUserAuthServer} sets up
        C{SSHUserAuthServer.supportedAuthentications} by checking the portal's
        credentials interfaces and mapping them to SSH authentication method
        strings.  If the Portal advertises an interface that
        L{SSHUserAuthServer} can't map, it should be ignored.  This is a white
        box test.
        r   r   N)r   r~   rK   rV   r   r}   rx   rM   r   r   r   r   r0   serverr"   r"   r#    test_ignoreUnknownCredInterfaces  s   	
z7SSHUserAuthServerTests.test_ignoreUnknownCredInterfacesc                 C   s   |  d| jj t }t| j|_dd |j_|	  |
  | d|j t }t| j|_dd |j_|	  |
  |  d|j dS )z
        Test that the userauth service does not advertise password
        authentication if the password would be send in cleartext.
        r   c                 S   r^   rT   r"   xr"   r"   r#   rd     re   zISSHUserAuthServerTests.test_removePasswordIfUnencrypted.<locals>.<lambda>c                 S      | dkS Ninr"   r   r"   r"   r#   rd         N)assertInr   r   r   r~   rK   rV   r   r`   rM   r   assertNotIn)r0   clearAuthServerhalfAuthServerr"   r"   r#    test_removePasswordIfUnencrypted  s   z7SSHUserAuthServerTests.test_removePasswordIfUnencryptedc                 C   s   t | j}|t  t }t||_dd |j_|	  |
  | |jdg t }t||_dd |j_|	  |
  | |jdg dS )z
        If the L{SSHUserAuthServer} is not advertising passwords, then an
        unencrypted connection should not cause any warnings or exceptions.
        This is a white box test.
        c                 S   r^   rT   r"   r   r"   r"   r#   rd     re   zSSSHUserAuthServerTests.test_unencryptedConnectionWithoutPasswords.<locals>.<lambda>r   c                 S   r   r   r"   r   r"   r"   r#   rd     r   N)r   r|   r}   rs   r   r~   rK   r   r`   rM   r   r   r   )r0   rV   r   r   r"   r"   r#   *test_unencryptedConnectionWithoutPasswords  s   


zASSHUserAuthServerTests.test_unencryptedConnectionWithoutPasswordsc                 C   s   t  }t |_t| j|_|  |j	d |
  | |jjtjdttjf td td fg | |jj dS )z0
        Test that the login times out.
        鰚        s   you took too longr   N)r   r~   r   r   r   rK   rV   r   rM   r   r   r   rX   MSG_DISCONNECTr   )DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLEr   
assertTruerW   r0   timeoutAuthServerr"   r"   r#   test_loginTimeout  s(   

z(SSHUserAuthServerTests.test_loginTimeoutc                 C   s\   t  }t |_t| j|_|  |	  |j
d | |jjg  | |jj dS )zN
        Test that stopping the service also stops the login timeout.
        r   N)r   r~   r   r   r   rK   rV   r   rM   r   r   r   rX   assertFalserW   r   r"   r"   r#   test_cancelLoginTimeout  s   
z.SSHUserAuthServerTests.test_cancelLoginTimeoutc                    sn   d tdtdtddtdg}t  j_tdD ]} j|} jjd q fd	d
}|	|S )zm
        Test that the server disconnects if the client fails authentication
        too many times.
        r   r6   rP   r   r   r      r   c                    s<      jjjd tjdttjf td td f d S )Nr   r   s   too many bad authsr   )r   r   r   rX   r   r   r   r   r   r/   r"   r#   r   1  s   
z:SSHUserAuthServerTests.test_tooManyAttempts.<locals>.check)
r   r   r   r   r   r   ranger   r   r   )r0   r   ir   r   r"   r/   r#   test_tooManyAttempts&  s   $
z+SSHUserAuthServerTests.test_tooManyAttemptsc                 C   sH   t dt d t d d t d }t | j_| j|}|| jS )zo
        If the user requests a service that we don't support, the
        authentication should fail.
        r6   r   r   r   )r   r   r   r   r   r   r   r   r   r"   r"   r#   test_failIfUnknownService?  s   $z0SSHUserAuthServerTests.test_failIfUnknownServicec                    sV   dd }   jd|    jdd  fdd} jddd} |t|S )	aZ  
        tryAuth() has two edge cases that are difficult to reach.

        1) an authentication method auth_* returns None instead of a Deferred.
        2) an authentication type that is defined does not have a matching
           auth_* method.

        Both these cases should return a Deferred which fails with a
        ConchError.
        c                 S   rH   r@   r"   )r   r"   r"   r#   mockAuthU  rI   z>SSHUserAuthServerTests.test_tryAuthEdgeCases.<locals>.mockAuthauth_publickeyauth_passwordNc                    s    j dd d } |tS )Nr   )r   r   r   r   )r   d2r/   r"   r#   
secondTest[  s   z@SSHUserAuthServerTests.test_tryAuthEdgeCases.<locals>.secondTestr   )r   r   r   r   r   r   )r0   r   r   d1r"   r/   r#   test_tryAuthEdgeCasesI  s   z,SSHUserAuthServerTests.test_tryAuthEdgeCases)r   r   r    r!   r   skipr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r"   r"   r"   r#   rz      s0    	
rz   c                   @   s   e Zd ZdZedu rdZdd Zdd Zdd	 Zd
d Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! ZdS )"SSHUserAuthClientTestsz&
    Tests for SSHUserAuthClient.
    Nr{   c                 C   s4   t dt | _td | j_d| jj_| j  d S )Nr6   r   )r(   rK   rN   
authClientr   r   rM   r/   r"   r"   r#   r   k  s   
zSSHUserAuthClientTests.setUpc                 C   r   r@   )r   r   r/   r"   r"   r#   r   q  r   zSSHUserAuthClientTests.tearDownc                 C   sT   |  | jjd |  | jjjd |  | jjjtjt	dt	d t	d fg dS )z;
        Test that client is initialized properly.
        r6   rL   rP   N)
r   r   r   instancer<   r   rX   r   r   r   r/   r"   r"   r#   	test_initu  s   z SSHUserAuthClientTests.test_initc                    s@   dg  fdd}|| j j_| j d |  d | j j dS )z9
        Test that the client succeeds properly.
        Nc                    s   |  d< d S )Nr   r"   )rQ   r   r"   r#   stubSetService  s   zDSSHUserAuthClientTests.test_USERAUTH_SUCCESS.<locals>.stubSetServicer   r   )r   r   
setServicessh_USERAUTH_SUCCESSr   r   )r0   r   r"   r   r#   test_USERAUTH_SUCCESS  s
   
z,SSHUserAuthClientTests.test_USERAUTH_SUCCESSc              	   C   s  | j tdd  | | j jjd tjtdtd td d td ttj	
tj  f | j tdd  ttj	
tj }| | j jjd tjtdtd td d td | f | j tdttj	
tj   t| j jjttjf td td td d td | }tj	
tj}| | j jjd tjtdtd td d td | t|| f d	S )
zJ
        Test that the client can authenticate with a public key.
        r   r   r   r6   rL   s   ssh-dssr      N)r   ssh_USERAUTH_FAILUREr   r   r   rX   r   r   r   r*   r+   r   r.   rE   r,   ssh_USERAUTH_PK_OKr   r   r2   r   )r0   rE   rv   rw   r"   r"   r#   test_publickey  s   

z%SSHUserAuthClientTests.test_publickeyc                 C   sz   t dt }td|_d|j_|  |d g |j_| |	d | 
|jjtjtdtd td fg dS )z
        If the SSHUserAuthClient doesn't return anything from signData,
        the client should start the authentication over again by requesting
        'none' authentication.
        r6   Nr   r   r   rL   rP   )rG   rK   rN   r   r   rM   r   rX   assertIsNoner   r   r   r   r   )r0   r   r"   r"   r#   !test_publickey_without_privatekey  s   

z8SSHUserAuthClientTests.test_publickey_without_privatekeyc                    s.   dd  j _ j d} fdd}||S )z{
        If there's no public key, auth_publickey should return a Deferred
        called back with a False value.
        c                 S   rH   r@   r"   r   r"   r"   r#   rd     re   z:SSHUserAuthClientTests.test_no_publickey.<locals>.<lambda>r   c                    s     |  d S r@   )r   resultr/   r"   r#   r     rJ   z7SSHUserAuthClientTests.test_no_publickey.<locals>.check)r   r1   r   r   )r0   r   r   r"   r/   r#   test_no_publickey  s   
z(SSHUserAuthClientTests.test_no_publickeyc                 C   s   | j tdd  | | j jjd tjtdtd td d td f | j tdtd  | | j jjd tjtdtd td d tdd  f d	S )
zx
        Test that the client can authentication with a password.  This
        includes changing the password.
        r   r   r   r6   rL   r   r   r   N)	r   r   r   r   r   rX   r   r   r   r/   r"   r"   r#   test_password  s   "&z$SSHUserAuthClientTests.test_passwordc                 C   s"   dd | j _| | j d dS )zK
        If getPassword returns None, tryAuth should return False.
        c                   S   rH   r@   r"   r"   r"   r"   r#   rd     re   z9SSHUserAuthClientTests.test_no_password.<locals>.<lambda>r   N)r   r9   r   r   r/   r"   r"   r#   test_no_password  s   z'SSHUserAuthClientTests.test_no_passwordc                 C   s`   | j tdtd td d td d  | | j jjd tjdtd td f dS )	zj
        Make sure that the client can authenticate with the keyboard
        interactive method.
        r   s      s
   Password: r   r   s      r6   N)r   'ssh_USERAUTH_PK_OK_keyboard_interactiver   r   r   rX   r   MSG_USERAUTH_INFO_RESPONSEr/   r"   r"   r#   test_keyboardInteractive  s&   z/SSHUserAuthClientTests.test_keyboardInteractivec                 C   sP   d| j _g | j j_| j d | | j jjtjtdtd td fg dS )z
        If C{SSHUserAuthClient} gets a MSG_USERAUTH_PK_OK packet when it's not
        expecting it, it should fail the current authentication and move on to
        the next type.
        s   unknownr   r6   rL   rP   N)	r   lastAuthr   rX   r   r   r   r   r   r/   r"   r"   r#   "test_USERAUTH_PK_OK_unknown_method  s   
z9SSHUserAuthClientTests.test_USERAUTH_PK_OK_unknown_methodc                    s    fdd} fdd}| j _| j _ j tdd    j jjd tj	tdtd	 td
 d td f  j tdd    j jjdd ddg dS )z
        ssh_USERAUTH_FAILURE should sort the methods by their position
        in SSHUserAuthClient.preferredOrder.  Methods that are not in
        preferredOrder should be sorted at the end of that list.
        c                      s    j jdd d S )N      here is datar   r   r]   r"   r/   r"   r#   auth_firstmethod2  s   zNSSHUserAuthClientTests.test_USERAUTH_FAILURE_sorting.<locals>.auth_firstmethodc                      s    j jdd dS )N   
   other dataTr  r"   r/   r"   r#   auth_anothermethod5  s   zPSSHUserAuthClientTests.test_USERAUTH_FAILURE_sorting.<locals>.auth_anothermethods   anothermethod,passwordr   r   r6   rL   r   s"   firstmethod,anothermethod,passwordr   N)r
  r  )r  r  )
r   r  r  r   r   r   r   rX   r   r   )r0   r  r  r"   r/   r#   test_USERAUTH_FAILURE_sorting+  s$   "
z4SSHUserAuthClientTests.test_USERAUTH_FAILURE_sortingc                 C   sT   | j tdd  | j tdd  | | j jjd tjdtd d f dS )	z
        If there are no more available user authentication messages,
        the SSHUserAuthClient should disconnect with code
        DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLE.
        r   r   r   r   s      s(   no more authentication methods availables       N)r   r   r   r   r   rX   r   r/   r"   r"   r#   %test_disconnectIfNoMoreAuthenticationO  s   z<SSHUserAuthClientTests.test_disconnectIfNoMoreAuthenticationc                 C   sH   g | j j_| j d | | j jjtjtdtd td fg dS )z
        _ebAuth (the generic authentication error handler) should send
        a request for the 'none' authentication method.
        Nr6   rL   rP   )r   r   rX   _ebAuthr   r   r   r   r/   r"   r"   r#   test_ebAutha  s   
z"SSHUserAuthClientTests.test_ebAuthc                    s`   t dt      fdd} fdddd   }|j	|S )z
        getPublicKey() should return None.  getPrivateKey() should return a
        failed Deferred.  getPassword() should return a failed Deferred.
        getGenericAnswers() should return a failed Deferred.
        r6   c                    s$   |  t   }|jS r@   )trapNotImplementedErrorr9   r   rn   
addErrbackr  r   )r   check2r0   r"   r#   r   v  s   
z3SSHUserAuthClientTests.test_defaults.<locals>.checkc                    s*   |  t  d d d }|jS r@   )r  r  r?   r   rn   r  r  )r   check3r0   r"   r#   r  {  s   
z4SSHUserAuthClientTests.test_defaults.<locals>.check2c                 S   s   |  t d S r@   )r  r  r   r"   r"   r#   r    rJ   z4SSHUserAuthClientTests.test_defaults.<locals>.check3)
r   r'   rK   rN   r   r1   r3   r   rn   r  )r0   r   r   r"   )r   r  r  r0   r#   test_defaultsm  s   z$SSHUserAuthClientTests.test_defaults)r   r   r    r!   r   r   r   r   r   r   r   r   r  r  r  r  r	  r  r  r  r  r"   r"   r"   r#   r   c  s&    >$r   c                   @   s.   e Zd Zedu r
dZG dd dZdd ZdS )LoopbackTestsNr{   c                   @   s"   e Zd ZG dd dZdd ZdS )zLoopbackTests.Factoryc                   @   rA   )zLoopbackTests.Factory.Service   TestServicec                 C   s   | j   d S r@   )r   rb   r/   r"   r"   r#   rM     rJ   z,LoopbackTests.Factory.Service.serviceStartedc                 C   rH   r@   r"   r/   r"   r"   r#   r     rI   z,LoopbackTests.Factory.Service.serviceStoppedN)r   r   r    r<   rM   r   r"   r"   r"   r#   rN     s    rN   c                 C   s   | j S r@   )rN   )r0   avatarr<   r"   r"   r#   rR     s   z LoopbackTests.Factory.getServiceN)r   r   r    rN   rR   r"   r"   r"   r#   rS     s    	rS   c                    s   t  tdj }t _j_dd j_t |_||j_d j_	|j_	dd  j_
|j_
 j_d_t }t|}t   t   t   fdd _|  |jj_tj|j}dd jj_d	d |jj_  |  fd
d}||S )zW
        Test that the userauth server and client play nicely with each other.
        r6   c                 S   r^   ra   r"   r   r"   r"   r#   rd     re   z-LoopbackTests.test_loopback.<locals>.<lambda>r   c                   S   rH   r@   r"   r"   r"   r"   r#   rd     re   r   c                    s   t  j|  dkS )Nr   )lensuccessfulCredentials)aId)checkerr"   r#   rd     s    c                   S   r^   )N_ServerLoopbackr"   r"   r"   r"   r#   rd     re   c                   S   r^   )N_ClientLoopbackr"   r"   r"   r"   r#   rd     re   c                    s     jjjd d S )Nr  )r   r   rQ   r<   r   r   r"   r#   r     rD   z*LoopbackTests.test_loopback.<locals>.check)r   r~   r(   rS   rN   r   r$   rQ   r`   r   sendKexInitrU   passwordDelayrc   r   r   r}   rk   rs   areDonerV   r   loopbackAsync	logPrefixrM   r   )r0   clientr|   rV   r   r   r"   )r#  r0   r   r#   test_loopback  s4   




zLoopbackTests.test_loopback)r   r   r    r   r   rS   r,  r"   r"   r"   r#   r    s
    r  c                   @   s    e Zd Zedu r
dZdd ZdS )ModuleInitializationTestsNr{   c                 C   s,   |  tjjd d |  tjjd d d S )N<   r   )r   r   r~   protocolMessagesr'   r/   r"   r"   r#   test_messages  s   z'ModuleInitializationTests.test_messages)r   r   r    r   r   r0  r"   r"   r"   r#   r-    s    r-  )8r!   typesr   typingr   zope.interfacer   twisted.conch.errorr   r   twisted.cred.checkersr   twisted.cred.credentialsr   r	   r
   twisted.cred.errorr   twisted.cred.portalr   r   twisted.internetr   r   twisted.protocolsr   twisted.python.reflectr   twisted.trialr   r   __annotations__twisted.conch.checkersr   twisted.conch.sshr   r   twisted.conch.ssh.commonr   twisted.conch.testr   r'   r(   rB   rG   r$   rK   rc   rk   rs   rx   TestCaserz   r   r  r-  r"   r"   r"   r#   <module>   sR   #A  }  &;