Package pike :: Package test :: Module negotiate
[hide private]
[frames] | no frames]

Source Code for Module pike.test.negotiate

  1  # 
  2  # Copyright (c) 2013, EMC Corporation 
  3  # All rights reserved. 
  4  # 
  5  # Redistribution and use in source and binary forms, with or without 
  6  # modification, are permitted provided that the following conditions are met: 
  7  # 
  8  # 1. Redistributions of source code must retain the above copyright notice, 
  9  # this list of conditions and the following disclaimer. 
 10  # 2. Redistributions in binary form must reproduce the above copyright notice, 
 11  # this list of conditions and the following disclaimer in the documentation 
 12  # and/or other materials provided with the distribution. 
 13  # 
 14  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
 15  # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 16  # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 17  # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
 18  # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 19  # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 20  # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 21  # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 22  # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
 23  # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 24  # POSSIBILITY OF SUCH DAMAGE. 
 25  # 
 26  # Module Name: 
 27  # 
 28  #        negotiate.py 
 29  # 
 30  # Abstract: 
 31  # 
 32  #        Tests negotiatate responses when requests are sent with a 
 33  #        variety of bits set 
 34  # 
 35  # Authors: Angela Bartholomaus (angela.bartholomaus@emc.com) 
 36  #          Paul Martin (paul.o.martin@emc.com) 
 37  # 
 38   
 39  import pike.crypto as crypto 
 40  import pike.model as model 
 41  import pike.smb2 as smb2 
 42  import pike.test as test 
 43  import random 
 44  import array 
45 46 -class CapTest(test.PikeTest):
47 - def setup(self):
48 # determine max capability and apply Required decorators 49 self.chan, self.tree = self.tree_connect() 50 self.max_dialect = self.chan.connection.negotiate_response.dialect_revision 51 self.capabilities = self.chan.connection.negotiate_response.capabilities 52 self.share_caps = self.tree.tree_connect_response.capabilities 53 self.chan.logoff() 54 self.chan.connection.close() 55 56 self.client = self.default_client 57 self.conn = self.client.connect(self.server, self.port)
58
59 - def teardown(self):
60 self.conn.close()
61
62 - def negotiate(self, dialect, caps):
63 """Negotiate requested dialect""" 64 self.client.dialects = [dialect, smb2.DIALECT_SMB2_002] 65 self.client.capabilities = caps 66 self.conn.negotiate() 67 68 return self.conn
69
70 - def positive_cap(self, dialect, req_caps=0, exp_caps=None):
71 """Test that cap is advertised by the server if the client advertises it""" 72 if exp_caps is None: 73 exp_caps = req_caps 74 conn = self.negotiate(dialect, req_caps) 75 self.assertEqual(conn.negotiate_response.capabilities & exp_caps, exp_caps)
76
77 - def negative_cap(self, dialect, cap):
78 """Test that cap is not advertised if we don't advertise it""" 79 conn = self.negotiate(dialect, 0) 80 self.assertEqual(conn.negotiate_response.capabilities & cap, 0)
81
82 - def downlevel_cap(self, cap):
83 """Test that cap is not advertised to downlevel dialect even if we advertise it""" 84 conn = self.negotiate(smb2.DIALECT_SMB2_002, cap) 85 self.assertEqual(conn.negotiate_response.capabilities & cap, 0)
86
87 -class CapPersistent(CapTest):
88 # persistent cap is advertised if we ask for it 89 @test.RequireDialect(smb2.DIALECT_SMB3_0) 90 @test.RequireShareCapabilities(smb2.SMB2_SHARE_CAP_CONTINUOUS_AVAILABILITY)
91 - def test_smb3(self):
93 94 # persistent cap is advertised if we ask for lots of caps 95 @test.RequireDialect(smb2.DIALECT_SMB3_0) 96 @test.RequireShareCapabilities(smb2.SMB2_SHARE_CAP_CONTINUOUS_AVAILABILITY) 106 107 # persistent cap is not advertised if we don't advertise it
108 - def test_smb3_no_advert(self):
110 111 # persistent cap is not advertised if we don't advertise it (SMB 2.1)
112 - def test_downlevel_no_advert(self):
114 115 # persistent cap is not advertised to downlevel client
116 - def test_downlevel(self):
118
119 -class CapMultichannel(CapTest):
120 # multichannel cap is advertised if we ask for it 121 @test.RequireDialect(smb2.DIALECT_SMB3_0)
122 - def test_smb3(self):
124 125 # multichannel cap is advertised if we ask for lots of caps 126 @test.RequireDialect(smb2.DIALECT_SMB3_0) 136 137 # multichannel cap is not advertised if we don't advertise it
138 - def test_smb3_no_advert(self):
140 141 # multichannel cap is not advertised if we don't advertise it (SMB 2.1)
142 - def test_downlevel_no_advert(self):
144 145 # multichannel cap is not advertised to downlevel client
146 - def test_downlevel(self):
148
149 -class CapMulticredit(CapTest):
150 # largemtu cap is advertised if we ask for it 151 @test.RequireDialect(smb2.DIALECT_SMB3_0)
152 - def test_smb3(self):
154 155 # largemtu cap is advertised if we ask for lots of caps 156 @test.RequireDialect(smb2.DIALECT_SMB3_0) 166 167 # largemtu cap is advertised if we ask for it
168 - def test_smb21(self):
170 171 # multicredit cap is not advertised to downlevel client
172 - def test_downlevel(self):
174 175 @test.RequireDialect(smb2.DIALECT_SMB3_1_1)
176 -class NegotiateContext(test.PikeTest):
177 - def negotiate(self, *args, **kwds):
178 self.client = self.default_client 179 self.conn = self.client.connect(self.server, self.port) 180 return self.conn.negotiate(*args, **kwds).negotiate_response
181
183 resp = self.negotiate(hash_algorithms=[smb2.SMB2_SHA_512]) 184 if resp.dialect_revision >= smb2.DIALECT_SMB3_1_1: 185 for ctx in resp: 186 if isinstance(ctx, smb2.PreauthIntegrityCapabilitiesResponse): 187 self.assertIn(smb2.SMB2_SHA_512, ctx.hash_algorithms) 188 self.assertGreater(len(ctx.salt), 0)
189
191 resp = self.negotiate(ciphers=[crypto.SMB2_AES_128_CCM, crypto.SMB2_AES_128_GCM]) 192 if resp.dialect_revision >= smb2.DIALECT_SMB3_1_1: 193 for ctx in resp: 194 if isinstance(ctx, crypto.EncryptionCapabilitiesResponse): 195 self.assertEqual(len(ctx.ciphers), 1) 196 self.assertIn(crypto.SMB2_AES_128_CCM, ctx.ciphers)
197
199 resp = self.negotiate(ciphers=[crypto.SMB2_AES_128_GCM, crypto.SMB2_AES_128_CCM]) 200 if resp.dialect_revision >= smb2.DIALECT_SMB3_1_1: 201 for ctx in resp: 202 if isinstance(ctx, crypto.EncryptionCapabilitiesResponse): 203 self.assertEqual(len(ctx.ciphers), 1) 204 self.assertIn(crypto.SMB2_AES_128_GCM, ctx.ciphers)
205
207 resp = self.negotiate(ciphers=[crypto.SMB2_AES_128_CCM]) 208 if resp.dialect_revision >= smb2.DIALECT_SMB3_1_1: 209 for ctx in resp: 210 if isinstance(ctx, crypto.EncryptionCapabilitiesResponse): 211 self.assertEqual(len(ctx.ciphers), 1) 212 self.assertIn(crypto.SMB2_AES_128_CCM, ctx.ciphers)
213
215 resp = self.negotiate(ciphers=[crypto.SMB2_AES_128_GCM]) 216 if resp.dialect_revision >= smb2.DIALECT_SMB3_1_1: 217 for ctx in resp: 218 if isinstance(ctx, crypto.EncryptionCapabilitiesResponse): 219 self.assertEqual(len(ctx.ciphers), 1) 220 self.assertIn(crypto.SMB2_AES_128_GCM, ctx.ciphers)
221