diff options
Diffstat (limited to 'python/requests/test_requests.py')
-rwxr-xr-x | python/requests/test_requests.py | 1746 |
1 files changed, 1746 insertions, 0 deletions
diff --git a/python/requests/test_requests.py b/python/requests/test_requests.py new file mode 100755 index 0000000000..0795241867 --- /dev/null +++ b/python/requests/test_requests.py @@ -0,0 +1,1746 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""Tests for Requests.""" + +from __future__ import division +import json +import os +import pickle +import unittest +import collections +import contextlib + +import io +import requests +import pytest +from requests.adapters import HTTPAdapter +from requests.auth import HTTPDigestAuth, _basic_auth_str +from requests.compat import ( + Morsel, cookielib, getproxies, str, urljoin, urlparse, is_py3, + builtin_str, OrderedDict + ) +from requests.cookies import cookiejar_from_dict, morsel_to_cookie +from requests.exceptions import (ConnectionError, ConnectTimeout, + InvalidSchema, InvalidURL, MissingSchema, + ReadTimeout, Timeout, RetryError) +from requests.models import PreparedRequest +from requests.structures import CaseInsensitiveDict +from requests.sessions import SessionRedirectMixin +from requests.models import urlencode +from requests.hooks import default_hooks + +try: + import StringIO +except ImportError: + import io as StringIO + +try: + from multiprocessing.pool import ThreadPool +except ImportError: + ThreadPool = None + +if is_py3: + def u(s): + return s +else: + def u(s): + return s.decode('unicode-escape') + + +@pytest.fixture +def httpbin(httpbin): + # Issue #1483: Make sure the URL always has a trailing slash + httpbin_url = httpbin.url.rstrip('/') + '/' + + def inner(*suffix): + return urljoin(httpbin_url, '/'.join(suffix)) + + return inner + + +@pytest.fixture +def httpsbin_url(httpbin_secure): + # Issue #1483: Make sure the URL always has a trailing slash + httpbin_url = httpbin_secure.url.rstrip('/') + '/' + + def inner(*suffix): + return urljoin(httpbin_url, '/'.join(suffix)) + + return inner + + +# Requests to this URL should always fail with a connection timeout (nothing +# listening on that port) +TARPIT = "http://10.255.255.1" + +class TestRequests(object): + + _multiprocess_can_split_ = True + + def setUp(self): + """Create simple data set with headers.""" + pass + + def tearDown(self): + """Teardown.""" + pass + + def test_entry_points(self): + + requests.session + requests.session().get + requests.session().head + requests.get + requests.head + requests.put + requests.patch + requests.post + + def test_invalid_url(self): + with pytest.raises(MissingSchema): + requests.get('hiwpefhipowhefopw') + with pytest.raises(InvalidSchema): + requests.get('localhost:3128') + with pytest.raises(InvalidSchema): + requests.get('localhost.localdomain:3128/') + with pytest.raises(InvalidSchema): + requests.get('10.122.1.1:3128/') + with pytest.raises(InvalidURL): + requests.get('http://') + + def test_basic_building(self): + req = requests.Request() + req.url = 'http://kennethreitz.org/' + req.data = {'life': '42'} + + pr = req.prepare() + assert pr.url == req.url + assert pr.body == 'life=42' + + def test_no_content_length(self, httpbin): + get_req = requests.Request('GET', httpbin('get')).prepare() + assert 'Content-Length' not in get_req.headers + head_req = requests.Request('HEAD', httpbin('head')).prepare() + assert 'Content-Length' not in head_req.headers + + def test_override_content_length(self, httpbin): + headers = { + 'Content-Length': 'not zero' + } + r = requests.Request('POST', httpbin('post'), headers=headers).prepare() + assert 'Content-Length' in r.headers + assert r.headers['Content-Length'] == 'not zero' + + def test_path_is_not_double_encoded(self): + request = requests.Request('GET', "http://0.0.0.0/get/test case").prepare() + + assert request.path_url == '/get/test%20case' + + def test_params_are_added_before_fragment(self): + request = requests.Request('GET', + "http://example.com/path#fragment", params={"a": "b"}).prepare() + assert request.url == "http://example.com/path?a=b#fragment" + request = requests.Request('GET', + "http://example.com/path?key=value#fragment", params={"a": "b"}).prepare() + assert request.url == "http://example.com/path?key=value&a=b#fragment" + + def test_params_original_order_is_preserved_by_default(self): + param_ordered_dict = OrderedDict((('z', 1), ('a', 1), ('k', 1), ('d', 1))) + session = requests.Session() + request = requests.Request('GET', 'http://example.com/', params=param_ordered_dict) + prep = session.prepare_request(request) + assert prep.url == 'http://example.com/?z=1&a=1&k=1&d=1' + + def test_params_bytes_are_encoded(self): + request = requests.Request('GET', 'http://example.com', + params=b'test=foo').prepare() + assert request.url == 'http://example.com/?test=foo' + + def test_binary_put(self): + request = requests.Request('PUT', 'http://example.com', + data=u"ööö".encode("utf-8")).prepare() + assert isinstance(request.body, bytes) + + def test_mixed_case_scheme_acceptable(self, httpbin): + s = requests.Session() + s.proxies = getproxies() + parts = urlparse(httpbin('get')) + schemes = ['http://', 'HTTP://', 'hTTp://', 'HttP://'] + for scheme in schemes: + url = scheme + parts.netloc + parts.path + r = requests.Request('GET', url) + r = s.send(r.prepare()) + assert r.status_code == 200, 'failed for scheme {0}'.format(scheme) + + def test_HTTP_200_OK_GET_ALTERNATIVE(self, httpbin): + r = requests.Request('GET', httpbin('get')) + s = requests.Session() + s.proxies = getproxies() + + r = s.send(r.prepare()) + + assert r.status_code == 200 + + def test_HTTP_302_ALLOW_REDIRECT_GET(self, httpbin): + r = requests.get(httpbin('redirect', '1')) + assert r.status_code == 200 + assert r.history[0].status_code == 302 + assert r.history[0].is_redirect + + # def test_HTTP_302_ALLOW_REDIRECT_POST(self): + # r = requests.post(httpbin('status', '302'), data={'some': 'data'}) + # self.assertEqual(r.status_code, 200) + + def test_HTTP_200_OK_GET_WITH_PARAMS(self, httpbin): + heads = {'User-agent': 'Mozilla/5.0'} + + r = requests.get(httpbin('user-agent'), headers=heads) + + assert heads['User-agent'] in r.text + assert r.status_code == 200 + + def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self, httpbin): + heads = {'User-agent': 'Mozilla/5.0'} + + r = requests.get(httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads) + assert r.status_code == 200 + + def test_set_cookie_on_301(self, httpbin): + s = requests.session() + url = httpbin('cookies/set?foo=bar') + s.get(url) + assert s.cookies['foo'] == 'bar' + + def test_cookie_sent_on_redirect(self, httpbin): + s = requests.session() + s.get(httpbin('cookies/set?foo=bar')) + r = s.get(httpbin('redirect/1')) # redirects to httpbin('get') + assert 'Cookie' in r.json()['headers'] + + def test_cookie_removed_on_expire(self, httpbin): + s = requests.session() + s.get(httpbin('cookies/set?foo=bar')) + assert s.cookies['foo'] == 'bar' + s.get( + httpbin('response-headers'), + params={ + 'Set-Cookie': + 'foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT' + } + ) + assert 'foo' not in s.cookies + + def test_cookie_quote_wrapped(self, httpbin): + s = requests.session() + s.get(httpbin('cookies/set?foo="bar:baz"')) + assert s.cookies['foo'] == '"bar:baz"' + + def test_cookie_persists_via_api(self, httpbin): + s = requests.session() + r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'}) + assert 'foo' in r.request.headers['Cookie'] + assert 'foo' in r.history[0].request.headers['Cookie'] + + def test_request_cookie_overrides_session_cookie(self, httpbin): + s = requests.session() + s.cookies['foo'] = 'bar' + r = s.get(httpbin('cookies'), cookies={'foo': 'baz'}) + assert r.json()['cookies']['foo'] == 'baz' + # Session cookie should not be modified + assert s.cookies['foo'] == 'bar' + + def test_request_cookies_not_persisted(self, httpbin): + s = requests.session() + s.get(httpbin('cookies'), cookies={'foo': 'baz'}) + # Sending a request with cookies should not add cookies to the session + assert not s.cookies + + def test_generic_cookiejar_works(self, httpbin): + cj = cookielib.CookieJar() + cookiejar_from_dict({'foo': 'bar'}, cj) + s = requests.session() + s.cookies = cj + r = s.get(httpbin('cookies')) + # Make sure the cookie was sent + assert r.json()['cookies']['foo'] == 'bar' + # Make sure the session cj is still the custom one + assert s.cookies is cj + + def test_param_cookiejar_works(self, httpbin): + cj = cookielib.CookieJar() + cookiejar_from_dict({'foo': 'bar'}, cj) + s = requests.session() + r = s.get(httpbin('cookies'), cookies=cj) + # Make sure the cookie was sent + assert r.json()['cookies']['foo'] == 'bar' + + def test_requests_in_history_are_not_overridden(self, httpbin): + resp = requests.get(httpbin('redirect/3')) + urls = [r.url for r in resp.history] + req_urls = [r.request.url for r in resp.history] + assert urls == req_urls + + def test_history_is_always_a_list(self, httpbin): + """ + Show that even with redirects, Response.history is always a list. + """ + resp = requests.get(httpbin('get')) + assert isinstance(resp.history, list) + resp = requests.get(httpbin('redirect/1')) + assert isinstance(resp.history, list) + assert not isinstance(resp.history, tuple) + + def test_headers_on_session_with_None_are_not_sent(self, httpbin): + """Do not send headers in Session.headers with None values.""" + ses = requests.Session() + ses.headers['Accept-Encoding'] = None + req = requests.Request('GET', httpbin('get')) + prep = ses.prepare_request(req) + assert 'Accept-Encoding' not in prep.headers + + def test_user_agent_transfers(self, httpbin): + + heads = { + 'User-agent': 'Mozilla/5.0 (github.com/kennethreitz/requests)' + } + + r = requests.get(httpbin('user-agent'), headers=heads) + assert heads['User-agent'] in r.text + + heads = { + 'user-agent': 'Mozilla/5.0 (github.com/kennethreitz/requests)' + } + + r = requests.get(httpbin('user-agent'), headers=heads) + assert heads['user-agent'] in r.text + + def test_HTTP_200_OK_HEAD(self, httpbin): + r = requests.head(httpbin('get')) + assert r.status_code == 200 + + def test_HTTP_200_OK_PUT(self, httpbin): + r = requests.put(httpbin('put')) + assert r.status_code == 200 + + def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin): + auth = ('user', 'pass') + url = httpbin('basic-auth', 'user', 'pass') + + r = requests.get(url, auth=auth) + assert r.status_code == 200 + + r = requests.get(url) + assert r.status_code == 401 + + s = requests.session() + s.auth = auth + r = s.get(url) + assert r.status_code == 200 + + def test_connection_error_invalid_domain(self): + """Connecting to an unknown domain should raise a ConnectionError""" + with pytest.raises(ConnectionError): + requests.get("http://doesnotexist.google.com") + + def test_connection_error_invalid_port(self): + """Connecting to an invalid port should raise a ConnectionError""" + with pytest.raises(ConnectionError): + requests.get("http://localhost:1", timeout=1) + + def test_LocationParseError(self): + """Inputing a URL that cannot be parsed should raise an InvalidURL error""" + with pytest.raises(InvalidURL): + requests.get("http://fe80::5054:ff:fe5a:fc0") + + def test_basicauth_with_netrc(self, httpbin): + auth = ('user', 'pass') + wrong_auth = ('wronguser', 'wrongpass') + url = httpbin('basic-auth', 'user', 'pass') + + old_auth = requests.sessions.get_netrc_auth + + try: + def get_netrc_auth_mock(url): + return auth + requests.sessions.get_netrc_auth = get_netrc_auth_mock + + # Should use netrc and work. + r = requests.get(url) + assert r.status_code == 200 + + # Given auth should override and fail. + r = requests.get(url, auth=wrong_auth) + assert r.status_code == 401 + + s = requests.session() + + # Should use netrc and work. + r = s.get(url) + assert r.status_code == 200 + + # Given auth should override and fail. + s.auth = wrong_auth + r = s.get(url) + assert r.status_code == 401 + finally: + requests.sessions.get_netrc_auth = old_auth + + def test_DIGEST_HTTP_200_OK_GET(self, httpbin): + + auth = HTTPDigestAuth('user', 'pass') + url = httpbin('digest-auth', 'auth', 'user', 'pass') + + r = requests.get(url, auth=auth) + assert r.status_code == 200 + + r = requests.get(url) + assert r.status_code == 401 + + s = requests.session() + s.auth = HTTPDigestAuth('user', 'pass') + r = s.get(url) + assert r.status_code == 200 + + def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin): + url = httpbin('digest-auth', 'auth', 'user', 'pass') + auth = HTTPDigestAuth('user', 'pass') + r = requests.get(url) + assert r.cookies['fake'] == 'fake_value' + + r = requests.get(url, auth=auth) + assert r.status_code == 200 + + def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin): + url = httpbin('digest-auth', 'auth', 'user', 'pass') + auth = HTTPDigestAuth('user', 'pass') + s = requests.Session() + s.get(url, auth=auth) + assert s.cookies['fake'] == 'fake_value' + + def test_DIGEST_STREAM(self, httpbin): + + auth = HTTPDigestAuth('user', 'pass') + url = httpbin('digest-auth', 'auth', 'user', 'pass') + + r = requests.get(url, auth=auth, stream=True) + assert r.raw.read() != b'' + + r = requests.get(url, auth=auth, stream=False) + assert r.raw.read() == b'' + + def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin): + + auth = HTTPDigestAuth('user', 'wrongpass') + url = httpbin('digest-auth', 'auth', 'user', 'pass') + + r = requests.get(url, auth=auth) + assert r.status_code == 401 + + r = requests.get(url) + assert r.status_code == 401 + + s = requests.session() + s.auth = auth + r = s.get(url) + assert r.status_code == 401 + + def test_DIGESTAUTH_QUOTES_QOP_VALUE(self, httpbin): + + auth = HTTPDigestAuth('user', 'pass') + url = httpbin('digest-auth', 'auth', 'user', 'pass') + + r = requests.get(url, auth=auth) + assert '"auth"' in r.request.headers['Authorization'] + + def test_POSTBIN_GET_POST_FILES(self, httpbin): + + url = httpbin('post') + post1 = requests.post(url).raise_for_status() + + post1 = requests.post(url, data={'some': 'data'}) + assert post1.status_code == 200 + + with open('requirements.txt') as f: + post2 = requests.post(url, files={'some': f}) + assert post2.status_code == 200 + + post4 = requests.post(url, data='[{"some": "json"}]') + assert post4.status_code == 200 + + with pytest.raises(ValueError): + requests.post(url, files=['bad file data']) + + def test_POSTBIN_GET_POST_FILES_WITH_DATA(self, httpbin): + + url = httpbin('post') + post1 = requests.post(url).raise_for_status() + + post1 = requests.post(url, data={'some': 'data'}) + assert post1.status_code == 200 + + with open('requirements.txt') as f: + post2 = requests.post(url, + data={'some': 'data'}, files={'some': f}) + assert post2.status_code == 200 + + post4 = requests.post(url, data='[{"some": "json"}]') + assert post4.status_code == 200 + + with pytest.raises(ValueError): + requests.post(url, files=['bad file data']) + + def test_conflicting_post_params(self, httpbin): + url = httpbin('post') + with open('requirements.txt') as f: + pytest.raises(ValueError, "requests.post(url, data='[{\"some\": \"data\"}]', files={'some': f})") + pytest.raises(ValueError, "requests.post(url, data=u('[{\"some\": \"data\"}]'), files={'some': f})") + + def test_request_ok_set(self, httpbin): + r = requests.get(httpbin('status', '404')) + assert not r.ok + + def test_status_raising(self, httpbin): + r = requests.get(httpbin('status', '404')) + with pytest.raises(requests.exceptions.HTTPError): + r.raise_for_status() + + r = requests.get(httpbin('status', '500')) + assert not r.ok + + def test_decompress_gzip(self, httpbin): + r = requests.get(httpbin('gzip')) + r.content.decode('ascii') + + def test_unicode_get(self, httpbin): + url = httpbin('/get') + requests.get(url, params={'foo': 'føø'}) + requests.get(url, params={'føø': 'føø'}) + requests.get(url, params={'føø': 'føø'}) + requests.get(url, params={'foo': 'foo'}) + requests.get(httpbin('ø'), params={'foo': 'foo'}) + + def test_unicode_header_name(self, httpbin): + requests.put( + httpbin('put'), + headers={str('Content-Type'): 'application/octet-stream'}, + data='\xff') # compat.str is unicode. + + def test_pyopenssl_redirect(self, httpsbin_url, httpbin_ca_bundle): + requests.get(httpsbin_url('status', '301'), verify=httpbin_ca_bundle) + + def test_urlencoded_get_query_multivalued_param(self, httpbin): + + r = requests.get(httpbin('get'), params=dict(test=['foo', 'baz'])) + assert r.status_code == 200 + assert r.url == httpbin('get?test=foo&test=baz') + + def test_different_encodings_dont_break_post(self, httpbin): + r = requests.post(httpbin('post'), + data={'stuff': json.dumps({'a': 123})}, + params={'blah': 'asdf1234'}, + files={'file': ('test_requests.py', open(__file__, 'rb'))}) + assert r.status_code == 200 + + def test_unicode_multipart_post(self, httpbin): + r = requests.post(httpbin('post'), + data={'stuff': u('ëlïxr')}, + files={'file': ('test_requests.py', open(__file__, 'rb'))}) + assert r.status_code == 200 + + r = requests.post(httpbin('post'), + data={'stuff': u('ëlïxr').encode('utf-8')}, + files={'file': ('test_requests.py', open(__file__, 'rb'))}) + assert r.status_code == 200 + + r = requests.post(httpbin('post'), + data={'stuff': 'elixr'}, + files={'file': ('test_requests.py', open(__file__, 'rb'))}) + assert r.status_code == 200 + + r = requests.post(httpbin('post'), + data={'stuff': 'elixr'.encode('utf-8')}, + files={'file': ('test_requests.py', open(__file__, 'rb'))}) + assert r.status_code == 200 + + def test_unicode_multipart_post_fieldnames(self, httpbin): + filename = os.path.splitext(__file__)[0] + '.py' + r = requests.Request(method='POST', + url=httpbin('post'), + data={'stuff'.encode('utf-8'): 'elixr'}, + files={'file': ('test_requests.py', + open(filename, 'rb'))}) + prep = r.prepare() + assert b'name="stuff"' in prep.body + assert b'name="b\'stuff\'"' not in prep.body + + def test_unicode_method_name(self, httpbin): + files = {'file': open('test_requests.py', 'rb')} + r = requests.request( + method=u('POST'), url=httpbin('post'), files=files) + assert r.status_code == 200 + + def test_unicode_method_name_with_request_object(self, httpbin): + files = {'file': open('test_requests.py', 'rb')} + s = requests.Session() + req = requests.Request(u("POST"), httpbin('post'), files=files) + prep = s.prepare_request(req) + assert isinstance(prep.method, builtin_str) + assert prep.method == "POST" + + resp = s.send(prep) + assert resp.status_code == 200 + + def test_custom_content_type(self, httpbin): + r = requests.post( + httpbin('post'), + data={'stuff': json.dumps({'a': 123})}, + files={'file1': ('test_requests.py', open(__file__, 'rb')), + 'file2': ('test_requests', open(__file__, 'rb'), + 'text/py-content-type')}) + assert r.status_code == 200 + assert b"text/py-content-type" in r.request.body + + def test_hook_receives_request_arguments(self, httpbin): + def hook(resp, **kwargs): + assert resp is not None + assert kwargs != {} + + requests.Request('GET', httpbin(), hooks={'response': hook}) + + def test_session_hooks_are_used_with_no_request_hooks(self, httpbin): + hook = lambda x, *args, **kwargs: x + s = requests.Session() + s.hooks['response'].append(hook) + r = requests.Request('GET', httpbin()) + prep = s.prepare_request(r) + assert prep.hooks['response'] != [] + assert prep.hooks['response'] == [hook] + + def test_session_hooks_are_overridden_by_request_hooks(self, httpbin): + hook1 = lambda x, *args, **kwargs: x + hook2 = lambda x, *args, **kwargs: x + assert hook1 is not hook2 + s = requests.Session() + s.hooks['response'].append(hook2) + r = requests.Request('GET', httpbin(), hooks={'response': [hook1]}) + prep = s.prepare_request(r) + assert prep.hooks['response'] == [hook1] + + def test_prepared_request_hook(self, httpbin): + def hook(resp, **kwargs): + resp.hook_working = True + return resp + + req = requests.Request('GET', httpbin(), hooks={'response': hook}) + prep = req.prepare() + + s = requests.Session() + s.proxies = getproxies() + resp = s.send(prep) + + assert hasattr(resp, 'hook_working') + + def test_prepared_from_session(self, httpbin): + class DummyAuth(requests.auth.AuthBase): + def __call__(self, r): + r.headers['Dummy-Auth-Test'] = 'dummy-auth-test-ok' + return r + + req = requests.Request('GET', httpbin('headers')) + assert not req.auth + + s = requests.Session() + s.auth = DummyAuth() + + prep = s.prepare_request(req) + resp = s.send(prep) + + assert resp.json()['headers'][ + 'Dummy-Auth-Test'] == 'dummy-auth-test-ok' + + def test_prepare_request_with_bytestring_url(self): + req = requests.Request('GET', b'https://httpbin.org/') + s = requests.Session() + prep = s.prepare_request(req) + assert prep.url == "https://httpbin.org/" + + def test_links(self): + r = requests.Response() + r.headers = { + 'cache-control': 'public, max-age=60, s-maxage=60', + 'connection': 'keep-alive', + 'content-encoding': 'gzip', + 'content-type': 'application/json; charset=utf-8', + 'date': 'Sat, 26 Jan 2013 16:47:56 GMT', + 'etag': '"6ff6a73c0e446c1f61614769e3ceb778"', + 'last-modified': 'Sat, 26 Jan 2013 16:22:39 GMT', + 'link': ('<https://api.github.com/users/kennethreitz/repos?' + 'page=2&per_page=10>; rel="next", <https://api.github.' + 'com/users/kennethreitz/repos?page=7&per_page=10>; ' + ' rel="last"'), + 'server': 'GitHub.com', + 'status': '200 OK', + 'vary': 'Accept', + 'x-content-type-options': 'nosniff', + 'x-github-media-type': 'github.beta', + 'x-ratelimit-limit': '60', + 'x-ratelimit-remaining': '57' + } + assert r.links['next']['rel'] == 'next' + + def test_cookie_parameters(self): + key = 'some_cookie' + value = 'some_value' + secure = True + domain = 'test.com' + rest = {'HttpOnly': True} + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value, secure=secure, domain=domain, rest=rest) + + assert len(jar) == 1 + assert 'some_cookie' in jar + + cookie = list(jar)[0] + assert cookie.secure == secure + assert cookie.domain == domain + assert cookie._rest['HttpOnly'] == rest['HttpOnly'] + + def test_cookie_as_dict_keeps_len(self): + key = 'some_cookie' + value = 'some_value' + + key1 = 'some_cookie1' + value1 = 'some_value1' + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value) + jar.set(key1, value1) + + d1 = dict(jar) + d2 = dict(jar.iteritems()) + d3 = dict(jar.items()) + + assert len(jar) == 2 + assert len(d1) == 2 + assert len(d2) == 2 + assert len(d3) == 2 + + def test_cookie_as_dict_keeps_items(self): + key = 'some_cookie' + value = 'some_value' + + key1 = 'some_cookie1' + value1 = 'some_value1' + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value) + jar.set(key1, value1) + + d1 = dict(jar) + d2 = dict(jar.iteritems()) + d3 = dict(jar.items()) + + assert d1['some_cookie'] == 'some_value' + assert d2['some_cookie'] == 'some_value' + assert d3['some_cookie1'] == 'some_value1' + + def test_cookie_as_dict_keys(self): + key = 'some_cookie' + value = 'some_value' + + key1 = 'some_cookie1' + value1 = 'some_value1' + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value) + jar.set(key1, value1) + + keys = jar.keys() + assert keys == list(keys) + # make sure one can use keys multiple times + assert list(keys) == list(keys) + + def test_cookie_as_dict_values(self): + key = 'some_cookie' + value = 'some_value' + + key1 = 'some_cookie1' + value1 = 'some_value1' + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value) + jar.set(key1, value1) + + values = jar.values() + assert values == list(values) + # make sure one can use values multiple times + assert list(values) == list(values) + + def test_cookie_as_dict_items(self): + key = 'some_cookie' + value = 'some_value' + + key1 = 'some_cookie1' + value1 = 'some_value1' + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value) + jar.set(key1, value1) + + items = jar.items() + assert items == list(items) + # make sure one can use items multiple times + assert list(items) == list(items) + + def test_time_elapsed_blank(self, httpbin): + r = requests.get(httpbin('get')) + td = r.elapsed + total_seconds = ((td.microseconds + (td.seconds + td.days * 24 * 3600) + * 10**6) / 10**6) + assert total_seconds > 0.0 + + def test_response_is_iterable(self): + r = requests.Response() + io = StringIO.StringIO('abc') + read_ = io.read + + def read_mock(amt, decode_content=None): + return read_(amt) + setattr(io, 'read', read_mock) + r.raw = io + assert next(iter(r)) + io.close() + + def test_response_decode_unicode(self): + """ + When called with decode_unicode, Response.iter_content should always + return unicode. + """ + r = requests.Response() + r._content_consumed = True + r._content = b'the content' + r.encoding = 'ascii' + + chunks = r.iter_content(decode_unicode=True) + assert all(isinstance(chunk, str) for chunk in chunks) + + # also for streaming + r = requests.Response() + r.raw = io.BytesIO(b'the content') + r.encoding = 'ascii' + chunks = r.iter_content(decode_unicode=True) + assert all(isinstance(chunk, str) for chunk in chunks) + + def test_request_and_response_are_pickleable(self, httpbin): + r = requests.get(httpbin('get')) + + # verify we can pickle the original request + assert pickle.loads(pickle.dumps(r.request)) + + # verify we can pickle the response and that we have access to + # the original request. + pr = pickle.loads(pickle.dumps(r)) + assert r.request.url == pr.request.url + assert r.request.headers == pr.request.headers + + def test_get_auth_from_url(self): + url = 'http://user:pass@complex.url.com/path?query=yes' + assert ('user', 'pass') == requests.utils.get_auth_from_url(url) + + def test_get_auth_from_url_encoded_spaces(self): + url = 'http://user:pass%20pass@complex.url.com/path?query=yes' + assert ('user', 'pass pass') == requests.utils.get_auth_from_url(url) + + def test_get_auth_from_url_not_encoded_spaces(self): + url = 'http://user:pass pass@complex.url.com/path?query=yes' + assert ('user', 'pass pass') == requests.utils.get_auth_from_url(url) + + def test_get_auth_from_url_percent_chars(self): + url = 'http://user%25user:pass@complex.url.com/path?query=yes' + assert ('user%user', 'pass') == requests.utils.get_auth_from_url(url) + + def test_get_auth_from_url_encoded_hashes(self): + url = 'http://user:pass%23pass@complex.url.com/path?query=yes' + assert ('user', 'pass#pass') == requests.utils.get_auth_from_url(url) + + def test_cannot_send_unprepared_requests(self, httpbin): + r = requests.Request(url=httpbin()) + with pytest.raises(ValueError): + requests.Session().send(r) + + def test_http_error(self): + error = requests.exceptions.HTTPError() + assert not error.response + response = requests.Response() + error = requests.exceptions.HTTPError(response=response) + assert error.response == response + error = requests.exceptions.HTTPError('message', response=response) + assert str(error) == 'message' + assert error.response == response + + def test_session_pickling(self, httpbin): + r = requests.Request('GET', httpbin('get')) + s = requests.Session() + + s = pickle.loads(pickle.dumps(s)) + s.proxies = getproxies() + + r = s.send(r.prepare()) + assert r.status_code == 200 + + def test_fixes_1329(self, httpbin): + """ + Ensure that header updates are done case-insensitively. + """ + s = requests.Session() + s.headers.update({'ACCEPT': 'BOGUS'}) + s.headers.update({'accept': 'application/json'}) + r = s.get(httpbin('get')) + headers = r.request.headers + assert headers['accept'] == 'application/json' + assert headers['Accept'] == 'application/json' + assert headers['ACCEPT'] == 'application/json' + + def test_uppercase_scheme_redirect(self, httpbin): + parts = urlparse(httpbin('html')) + url = "HTTP://" + parts.netloc + parts.path + r = requests.get(httpbin('redirect-to'), params={'url': url}) + assert r.status_code == 200 + assert r.url.lower() == url.lower() + + def test_transport_adapter_ordering(self): + s = requests.Session() + order = ['https://', 'http://'] + assert order == list(s.adapters) + s.mount('http://git', HTTPAdapter()) + s.mount('http://github', HTTPAdapter()) + s.mount('http://github.com', HTTPAdapter()) + s.mount('http://github.com/about/', HTTPAdapter()) + order = [ + 'http://github.com/about/', + 'http://github.com', + 'http://github', + 'http://git', + 'https://', + 'http://', + ] + assert order == list(s.adapters) + s.mount('http://gittip', HTTPAdapter()) + s.mount('http://gittip.com', HTTPAdapter()) + s.mount('http://gittip.com/about/', HTTPAdapter()) + order = [ + 'http://github.com/about/', + 'http://gittip.com/about/', + 'http://github.com', + 'http://gittip.com', + 'http://github', + 'http://gittip', + 'http://git', + 'https://', + 'http://', + ] + assert order == list(s.adapters) + s2 = requests.Session() + s2.adapters = {'http://': HTTPAdapter()} + s2.mount('https://', HTTPAdapter()) + assert 'http://' in s2.adapters + assert 'https://' in s2.adapters + + def test_header_remove_is_case_insensitive(self, httpbin): + # From issue #1321 + s = requests.Session() + s.headers['foo'] = 'bar' + r = s.get(httpbin('get'), headers={'FOO': None}) + assert 'foo' not in r.request.headers + + def test_params_are_merged_case_sensitive(self, httpbin): + s = requests.Session() + s.params['foo'] = 'bar' + r = s.get(httpbin('get'), params={'FOO': 'bar'}) + assert r.json()['args'] == {'foo': 'bar', 'FOO': 'bar'} + + def test_long_authinfo_in_url(self): + url = 'http://{0}:{1}@{2}:9000/path?query#frag'.format( + 'E8A3BE87-9E3F-4620-8858-95478E385B5B', + 'EA770032-DA4D-4D84-8CE9-29C6D910BF1E', + 'exactly-------------sixty-----------three------------characters', + ) + r = requests.Request('GET', url).prepare() + assert r.url == url + + def test_header_keys_are_native(self, httpbin): + headers = {u('unicode'): 'blah', 'byte'.encode('ascii'): 'blah'} + r = requests.Request('GET', httpbin('get'), headers=headers) + p = r.prepare() + + # This is testing that they are builtin strings. A bit weird, but there + # we go. + assert 'unicode' in p.headers.keys() + assert 'byte' in p.headers.keys() + + def test_can_send_nonstring_objects_with_files(self, httpbin): + data = {'a': 0.0} + files = {'b': 'foo'} + r = requests.Request('POST', httpbin('post'), data=data, files=files) + p = r.prepare() + + assert 'multipart/form-data' in p.headers['Content-Type'] + + def test_can_send_bytes_bytearray_objects_with_files(self, httpbin): + # Test bytes: + data = {'a': 'this is a string'} + files = {'b': b'foo'} + r = requests.Request('POST', httpbin('post'), data=data, files=files) + p = r.prepare() + assert 'multipart/form-data' in p.headers['Content-Type'] + # Test bytearrays: + files = {'b': bytearray(b'foo')} + r = requests.Request('POST', httpbin('post'), data=data, files=files) + p = r.prepare() + assert 'multipart/form-data' in p.headers['Content-Type'] + + def test_can_send_file_object_with_non_string_filename(self, httpbin): + f = io.BytesIO() + f.name = 2 + r = requests.Request('POST', httpbin('post'), files={'f': f}) + p = r.prepare() + + assert 'multipart/form-data' in p.headers['Content-Type'] + + def test_autoset_header_values_are_native(self, httpbin): + data = 'this is a string' + length = '16' + req = requests.Request('POST', httpbin('post'), data=data) + p = req.prepare() + + assert p.headers['Content-Length'] == length + + def test_nonhttp_schemes_dont_check_URLs(self): + test_urls = ( + 'data:image/gif;base64,R0lGODlhAQABAHAAACH5BAUAAAAALAAAAAABAAEAAAICRAEAOw==', + 'file:///etc/passwd', + 'magnet:?xt=urn:btih:be08f00302bc2d1d3cfa3af02024fa647a271431', + ) + for test_url in test_urls: + req = requests.Request('GET', test_url) + preq = req.prepare() + assert test_url == preq.url + + def test_auth_is_stripped_on_redirect_off_host(self, httpbin): + r = requests.get( + httpbin('redirect-to'), + params={'url': 'http://www.google.co.uk'}, + auth=('user', 'pass'), + ) + assert r.history[0].request.headers['Authorization'] + assert not r.request.headers.get('Authorization', '') + + def test_auth_is_retained_for_redirect_on_host(self, httpbin): + r = requests.get(httpbin('redirect/1'), auth=('user', 'pass')) + h1 = r.history[0].request.headers['Authorization'] + h2 = r.request.headers['Authorization'] + + assert h1 == h2 + + def test_manual_redirect_with_partial_body_read(self, httpbin): + s = requests.Session() + r1 = s.get(httpbin('redirect/2'), allow_redirects=False, stream=True) + assert r1.is_redirect + rg = s.resolve_redirects(r1, r1.request, stream=True) + + # read only the first eight bytes of the response body, + # then follow the redirect + r1.iter_content(8) + r2 = next(rg) + assert r2.is_redirect + + # read all of the response via iter_content, + # then follow the redirect + for _ in r2.iter_content(): + pass + r3 = next(rg) + assert not r3.is_redirect + + def _patch_adapter_gzipped_redirect(self, session, url): + adapter = session.get_adapter(url=url) + org_build_response = adapter.build_response + self._patched_response = False + + def build_response(*args, **kwargs): + resp = org_build_response(*args, **kwargs) + if not self._patched_response: + resp.raw.headers['content-encoding'] = 'gzip' + self._patched_response = True + return resp + + adapter.build_response = build_response + + def test_redirect_with_wrong_gzipped_header(self, httpbin): + s = requests.Session() + url = httpbin('redirect/1') + self._patch_adapter_gzipped_redirect(s, url) + s.get(url) + + def test_basic_auth_str_is_always_native(self): + s = _basic_auth_str("test", "test") + assert isinstance(s, builtin_str) + assert s == "Basic dGVzdDp0ZXN0" + + def test_requests_history_is_saved(self, httpbin): + r = requests.get(httpbin('redirect/5')) + total = r.history[-1].history + i = 0 + for item in r.history: + assert item.history == total[0:i] + i = i + 1 + + def test_json_param_post_content_type_works(self, httpbin): + r = requests.post( + httpbin('post'), + json={'life': 42} + ) + assert r.status_code == 200 + assert 'application/json' in r.request.headers['Content-Type'] + assert {'life': 42} == r.json()['json'] + + def test_json_param_post_should_not_override_data_param(self, httpbin): + r = requests.Request(method='POST', url=httpbin('post'), + data={'stuff': 'elixr'}, + json={'music': 'flute'}) + prep = r.prepare() + assert 'stuff=elixr' == prep.body + + def test_response_iter_lines(self, httpbin): + r = requests.get(httpbin('stream/4'), stream=True) + assert r.status_code == 200 + + it = r.iter_lines() + next(it) + assert len(list(it)) == 3 + + def test_unconsumed_session_response_closes_connection(self, httpbin): + s = requests.session() + + with contextlib.closing(s.get(httpbin('stream/4'), stream=True)) as response: + pass + + assert response._content_consumed is False + assert response.raw.closed + + @pytest.mark.xfail + def test_response_iter_lines_reentrant(self, httpbin): + """Response.iter_lines() is not reentrant safe""" + r = requests.get(httpbin('stream/4'), stream=True) + assert r.status_code == 200 + + next(r.iter_lines()) + assert len(list(r.iter_lines())) == 3 + + +class TestContentEncodingDetection(unittest.TestCase): + + def test_none(self): + encodings = requests.utils.get_encodings_from_content('') + assert not len(encodings) + + def test_html_charset(self): + """HTML5 meta charset attribute""" + content = '<meta charset="UTF-8">' + encodings = requests.utils.get_encodings_from_content(content) + assert len(encodings) == 1 + assert encodings[0] == 'UTF-8' + + def test_html4_pragma(self): + """HTML4 pragma directive""" + content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8">' + encodings = requests.utils.get_encodings_from_content(content) + assert len(encodings) == 1 + assert encodings[0] == 'UTF-8' + + def test_xhtml_pragma(self): + """XHTML 1.x served with text/html MIME type""" + content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8" />' + encodings = requests.utils.get_encodings_from_content(content) + assert len(encodings) == 1 + assert encodings[0] == 'UTF-8' + + def test_xml(self): + """XHTML 1.x served as XML""" + content = '<?xml version="1.0" encoding="UTF-8"?>' + encodings = requests.utils.get_encodings_from_content(content) + assert len(encodings) == 1 + assert encodings[0] == 'UTF-8' + + def test_precedence(self): + content = ''' + <?xml version="1.0" encoding="XML"?> + <meta charset="HTML5"> + <meta http-equiv="Content-type" content="text/html;charset=HTML4" /> + '''.strip() + encodings = requests.utils.get_encodings_from_content(content) + assert encodings == ['HTML5', 'HTML4', 'XML'] + + +class TestCaseInsensitiveDict(unittest.TestCase): + + def test_mapping_init(self): + cid = CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'}) + assert len(cid) == 2 + assert 'foo' in cid + assert 'bar' in cid + + def test_iterable_init(self): + cid = CaseInsensitiveDict([('Foo', 'foo'), ('BAr', 'bar')]) + assert len(cid) == 2 + assert 'foo' in cid + assert 'bar' in cid + + def test_kwargs_init(self): + cid = CaseInsensitiveDict(FOO='foo', BAr='bar') + assert len(cid) == 2 + assert 'foo' in cid + assert 'bar' in cid + + def test_docstring_example(self): + cid = CaseInsensitiveDict() + cid['Accept'] = 'application/json' + assert cid['aCCEPT'] == 'application/json' + assert list(cid) == ['Accept'] + + def test_len(self): + cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'}) + cid['A'] = 'a' + assert len(cid) == 2 + + def test_getitem(self): + cid = CaseInsensitiveDict({'Spam': 'blueval'}) + assert cid['spam'] == 'blueval' + assert cid['SPAM'] == 'blueval' + + def test_fixes_649(self): + """__setitem__ should behave case-insensitively.""" + cid = CaseInsensitiveDict() + cid['spam'] = 'oneval' + cid['Spam'] = 'twoval' + cid['sPAM'] = 'redval' + cid['SPAM'] = 'blueval' + assert cid['spam'] == 'blueval' + assert cid['SPAM'] == 'blueval' + assert list(cid.keys()) == ['SPAM'] + + def test_delitem(self): + cid = CaseInsensitiveDict() + cid['Spam'] = 'someval' + del cid['sPam'] + assert 'spam' not in cid + assert len(cid) == 0 + + def test_contains(self): + cid = CaseInsensitiveDict() + cid['Spam'] = 'someval' + assert 'Spam' in cid + assert 'spam' in cid + assert 'SPAM' in cid + assert 'sPam' in cid + assert 'notspam' not in cid + + def test_get(self): + cid = CaseInsensitiveDict() + cid['spam'] = 'oneval' + cid['SPAM'] = 'blueval' + assert cid.get('spam') == 'blueval' + assert cid.get('SPAM') == 'blueval' + assert cid.get('sPam') == 'blueval' + assert cid.get('notspam', 'default') == 'default' + + def test_update(self): + cid = CaseInsensitiveDict() + cid['spam'] = 'blueval' + cid.update({'sPam': 'notblueval'}) + assert cid['spam'] == 'notblueval' + cid = CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'}) + cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'}) + assert len(cid) == 2 + assert cid['foo'] == 'anotherfoo' + assert cid['bar'] == 'anotherbar' + + def test_update_retains_unchanged(self): + cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'}) + cid.update({'foo': 'newfoo'}) + assert cid['bar'] == 'bar' + + def test_iter(self): + cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'}) + keys = frozenset(['Spam', 'Eggs']) + assert frozenset(iter(cid)) == keys + + def test_equality(self): + cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'}) + othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'}) + assert cid == othercid + del othercid['spam'] + assert cid != othercid + assert cid == {'spam': 'blueval', 'eggs': 'redval'} + assert cid != object() + + def test_setdefault(self): + cid = CaseInsensitiveDict({'Spam': 'blueval'}) + assert cid.setdefault('spam', 'notblueval') == 'blueval' + assert cid.setdefault('notspam', 'notblueval') == 'notblueval' + + def test_lower_items(self): + cid = CaseInsensitiveDict({ + 'Accept': 'application/json', + 'user-Agent': 'requests', + }) + keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items()) + lowerkeyset = frozenset(['accept', 'user-agent']) + assert keyset == lowerkeyset + + def test_preserve_key_case(self): + cid = CaseInsensitiveDict({ + 'Accept': 'application/json', + 'user-Agent': 'requests', + }) + keyset = frozenset(['Accept', 'user-Agent']) + assert frozenset(i[0] for i in cid.items()) == keyset + assert frozenset(cid.keys()) == keyset + assert frozenset(cid) == keyset + + def test_preserve_last_key_case(self): + cid = CaseInsensitiveDict({ + 'Accept': 'application/json', + 'user-Agent': 'requests', + }) + cid.update({'ACCEPT': 'application/json'}) + cid['USER-AGENT'] = 'requests' + keyset = frozenset(['ACCEPT', 'USER-AGENT']) + assert frozenset(i[0] for i in cid.items()) == keyset + assert frozenset(cid.keys()) == keyset + assert frozenset(cid) == keyset + + def test_copy(self): + cid = CaseInsensitiveDict({ + 'Accept': 'application/json', + 'user-Agent': 'requests', + }) + cid_copy = cid.copy() + assert cid == cid_copy + cid['changed'] = True + assert cid != cid_copy + + +class UtilsTestCase(unittest.TestCase): + + def test_super_len_io_streams(self): + """ Ensures that we properly deal with different kinds of IO streams. """ + # uses StringIO or io.StringIO (see import above) + from io import BytesIO + from requests.utils import super_len + + assert super_len(StringIO.StringIO()) == 0 + assert super_len( + StringIO.StringIO('with so much drama in the LBC')) == 29 + + assert super_len(BytesIO()) == 0 + assert super_len( + BytesIO(b"it's kinda hard bein' snoop d-o-double-g")) == 40 + + try: + import cStringIO + except ImportError: + pass + else: + assert super_len( + cStringIO.StringIO('but some how, some way...')) == 25 + + def test_super_len_correctly_calculates_len_of_partially_read_file(self): + """Ensure that we handle partially consumed file like objects.""" + from requests.utils import super_len + s = StringIO.StringIO() + s.write('foobarbogus') + assert super_len(s) == 0 + + def test_get_environ_proxies_ip_ranges(self): + """Ensures that IP addresses are correctly matches with ranges + in no_proxy variable.""" + from requests.utils import get_environ_proxies + os.environ['no_proxy'] = "192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1" + assert get_environ_proxies('http://192.168.0.1:5000/') == {} + assert get_environ_proxies('http://192.168.0.1/') == {} + assert get_environ_proxies('http://172.16.1.1/') == {} + assert get_environ_proxies('http://172.16.1.1:5000/') == {} + assert get_environ_proxies('http://192.168.1.1:5000/') != {} + assert get_environ_proxies('http://192.168.1.1/') != {} + + def test_get_environ_proxies(self): + """Ensures that IP addresses are correctly matches with ranges + in no_proxy variable.""" + from requests.utils import get_environ_proxies + os.environ['no_proxy'] = "127.0.0.1,localhost.localdomain,192.168.0.0/24,172.16.1.1" + assert get_environ_proxies( + 'http://localhost.localdomain:5000/v1.0/') == {} + assert get_environ_proxies('http://www.requests.com/') != {} + + def test_select_proxies(self): + """Make sure we can select per-host proxies correctly.""" + from requests.utils import select_proxy + proxies = {'http': 'http://http.proxy', + 'http://some.host': 'http://some.host.proxy'} + assert select_proxy('hTTp://u:p@Some.Host/path', proxies) == 'http://some.host.proxy' + assert select_proxy('hTTp://u:p@Other.Host/path', proxies) == 'http://http.proxy' + assert select_proxy('hTTps://Other.Host', proxies) is None + + def test_guess_filename_when_int(self): + from requests.utils import guess_filename + assert None is guess_filename(1) + + def test_guess_filename_when_filename_is_an_int(self): + from requests.utils import guess_filename + fake = type('Fake', (object,), {'name': 1})() + assert None is guess_filename(fake) + + def test_guess_filename_with_file_like_obj(self): + from requests.utils import guess_filename + from requests import compat + fake = type('Fake', (object,), {'name': b'value'})() + guessed_name = guess_filename(fake) + assert b'value' == guessed_name + assert isinstance(guessed_name, compat.bytes) + + def test_guess_filename_with_unicode_name(self): + from requests.utils import guess_filename + from requests import compat + filename = b'value'.decode('utf-8') + fake = type('Fake', (object,), {'name': filename})() + guessed_name = guess_filename(fake) + assert filename == guessed_name + assert isinstance(guessed_name, compat.str) + + def test_is_ipv4_address(self): + from requests.utils import is_ipv4_address + assert is_ipv4_address('8.8.8.8') + assert not is_ipv4_address('8.8.8.8.8') + assert not is_ipv4_address('localhost.localdomain') + + def test_is_valid_cidr(self): + from requests.utils import is_valid_cidr + assert not is_valid_cidr('8.8.8.8') + assert is_valid_cidr('192.168.1.0/24') + + def test_dotted_netmask(self): + from requests.utils import dotted_netmask + assert dotted_netmask(8) == '255.0.0.0' + assert dotted_netmask(24) == '255.255.255.0' + assert dotted_netmask(25) == '255.255.255.128' + + def test_address_in_network(self): + from requests.utils import address_in_network + assert address_in_network('192.168.1.1', '192.168.1.0/24') + assert not address_in_network('172.16.0.1', '192.168.1.0/24') + + def test_get_auth_from_url(self): + """Ensures that username and password in well-encoded URI as per + RFC 3986 are correclty extracted.""" + from requests.utils import get_auth_from_url + from requests.compat import quote + percent_encoding_test_chars = "%!*'();:@&=+$,/?#[] " + url_address = "request.com/url.html#test" + url = "http://" + quote( + percent_encoding_test_chars, '') + ':' + quote( + percent_encoding_test_chars, '') + '@' + url_address + (username, password) = get_auth_from_url(url) + assert username == percent_encoding_test_chars + assert password == percent_encoding_test_chars + + def test_requote_uri_with_unquoted_percents(self): + """Ensure we handle unquoted percent signs in redirects. + + See: https://github.com/kennethreitz/requests/issues/2356 + """ + from requests.utils import requote_uri + bad_uri = 'http://example.com/fiz?buz=%ppicture' + quoted = 'http://example.com/fiz?buz=%25ppicture' + assert quoted == requote_uri(bad_uri) + + def test_requote_uri_properly_requotes(self): + """Ensure requoting doesn't break expectations.""" + from requests.utils import requote_uri + quoted = 'http://example.com/fiz?buz=%25ppicture' + assert quoted == requote_uri(quoted) + + +class TestMorselToCookieExpires(unittest.TestCase): + + """Tests for morsel_to_cookie when morsel contains expires.""" + + def test_expires_valid_str(self): + """Test case where we convert expires from string time.""" + + morsel = Morsel() + morsel['expires'] = 'Thu, 01-Jan-1970 00:00:01 GMT' + cookie = morsel_to_cookie(morsel) + assert cookie.expires == 1 + + def test_expires_invalid_int(self): + """Test case where an invalid type is passed for expires.""" + + morsel = Morsel() + morsel['expires'] = 100 + with pytest.raises(TypeError): + morsel_to_cookie(morsel) + + def test_expires_invalid_str(self): + """Test case where an invalid string is input.""" + + morsel = Morsel() + morsel['expires'] = 'woops' + with pytest.raises(ValueError): + morsel_to_cookie(morsel) + + def test_expires_none(self): + """Test case where expires is None.""" + + morsel = Morsel() + morsel['expires'] = None + cookie = morsel_to_cookie(morsel) + assert cookie.expires is None + + +class TestMorselToCookieMaxAge(unittest.TestCase): + + """Tests for morsel_to_cookie when morsel contains max-age.""" + + def test_max_age_valid_int(self): + """Test case where a valid max age in seconds is passed.""" + + morsel = Morsel() + morsel['max-age'] = 60 + cookie = morsel_to_cookie(morsel) + assert isinstance(cookie.expires, int) + + def test_max_age_invalid_str(self): + """Test case where a invalid max age is passed.""" + + morsel = Morsel() + morsel['max-age'] = 'woops' + with pytest.raises(TypeError): + morsel_to_cookie(morsel) + + +class TestTimeout: + def test_stream_timeout(self, httpbin): + try: + requests.get(httpbin('delay/10'), timeout=2.0) + except requests.exceptions.Timeout as e: + assert 'Read timed out' in e.args[0].args[0] + + def test_invalid_timeout(self, httpbin): + with pytest.raises(ValueError) as e: + requests.get(httpbin('get'), timeout=(3, 4, 5)) + assert '(connect, read)' in str(e) + + with pytest.raises(ValueError) as e: + requests.get(httpbin('get'), timeout="foo") + assert 'must be an int or float' in str(e) + + def test_none_timeout(self, httpbin): + """ Check that you can set None as a valid timeout value. + + To actually test this behavior, we'd want to check that setting the + timeout to None actually lets the request block past the system default + timeout. However, this would make the test suite unbearably slow. + Instead we verify that setting the timeout to None does not prevent the + request from succeeding. + """ + r = requests.get(httpbin('get'), timeout=None) + assert r.status_code == 200 + + def test_read_timeout(self, httpbin): + try: + requests.get(httpbin('delay/10'), timeout=(None, 0.1)) + assert False, "The recv() request should time out." + except ReadTimeout: + pass + + def test_connect_timeout(self): + try: + requests.get(TARPIT, timeout=(0.1, None)) + assert False, "The connect() request should time out." + except ConnectTimeout as e: + assert isinstance(e, ConnectionError) + assert isinstance(e, Timeout) + + def test_total_timeout_connect(self): + try: + requests.get(TARPIT, timeout=(0.1, 0.1)) + assert False, "The connect() request should time out." + except ConnectTimeout: + pass + + def test_encoded_methods(self, httpbin): + """See: https://github.com/kennethreitz/requests/issues/2316""" + r = requests.request(b'GET', httpbin('get')) + assert r.ok + + +SendCall = collections.namedtuple('SendCall', ('args', 'kwargs')) + + +class RedirectSession(SessionRedirectMixin): + def __init__(self, order_of_redirects): + self.redirects = order_of_redirects + self.calls = [] + self.max_redirects = 30 + self.cookies = {} + self.trust_env = False + + def send(self, *args, **kwargs): + self.calls.append(SendCall(args, kwargs)) + return self.build_response() + + def build_response(self): + request = self.calls[-1].args[0] + r = requests.Response() + + try: + r.status_code = int(self.redirects.pop(0)) + except IndexError: + r.status_code = 200 + + r.headers = CaseInsensitiveDict({'Location': '/'}) + r.raw = self._build_raw() + r.request = request + return r + + def _build_raw(self): + string = StringIO.StringIO('') + setattr(string, 'release_conn', lambda *args: args) + return string + + +class TestRedirects: + default_keyword_args = { + 'stream': False, + 'verify': True, + 'cert': None, + 'timeout': None, + 'allow_redirects': False, + 'proxies': {}, + } + + def test_requests_are_updated_each_time(self, httpbin): + session = RedirectSession([303, 307]) + prep = requests.Request('POST', httpbin('post')).prepare() + r0 = session.send(prep) + assert r0.request.method == 'POST' + assert session.calls[-1] == SendCall((r0.request,), {}) + redirect_generator = session.resolve_redirects(r0, prep) + for response in redirect_generator: + assert response.request.method == 'GET' + send_call = SendCall((response.request,), + TestRedirects.default_keyword_args) + assert session.calls[-1] == send_call + + + +@pytest.fixture +def list_of_tuples(): + return [ + (('a', 'b'), ('c', 'd')), + (('c', 'd'), ('a', 'b')), + (('a', 'b'), ('c', 'd'), ('e', 'f')), + ] + + +def test_data_argument_accepts_tuples(list_of_tuples): + """ + Ensure that the data argument will accept tuples of strings + and properly encode them. + """ + for data in list_of_tuples: + p = PreparedRequest() + p.prepare( + method='GET', + url='http://www.example.com', + data=data, + hooks=default_hooks() + ) + assert p.body == urlencode(data) + + +def assert_copy(p, p_copy): + for attr in ('method', 'url', 'headers', '_cookies', 'body', 'hooks'): + assert getattr(p, attr) == getattr(p_copy, attr) + + +def test_prepared_request_empty_copy(): + p = PreparedRequest() + assert_copy(p, p.copy()) + + +def test_prepared_request_no_cookies_copy(): + p = PreparedRequest() + p.prepare( + method='GET', + url='http://www.example.com', + data='foo=bar', + hooks=default_hooks() + ) + assert_copy(p, p.copy()) + + +def test_prepared_request_complete_copy(): + p = PreparedRequest() + p.prepare( + method='GET', + url='http://www.example.com', + data='foo=bar', + hooks=default_hooks(), + cookies={'foo': 'bar'} + ) + assert_copy(p, p.copy()) + + +def test_prepare_unicode_url(): + p = PreparedRequest() + p.prepare( + method='GET', + url=u('http://www.example.com/üniçø∂é'), + ) + assert_copy(p, p.copy()) + + +def test_urllib3_retries(httpbin): + from requests.packages.urllib3.util import Retry + s = requests.Session() + s.mount('http://', HTTPAdapter(max_retries=Retry( + total=2, status_forcelist=[500] + ))) + + with pytest.raises(RetryError): + s.get(httpbin('status/500')) + + +def test_urllib3_pool_connection_closed(httpbin): + s = requests.Session() + s.mount('http://', HTTPAdapter(pool_connections=0, pool_maxsize=0)) + + try: + s.get(httpbin('status/200')) + except ConnectionError as e: + assert u"Pool is closed." in str(e) + + +def test_vendor_aliases(): + from requests.packages import urllib3 + from requests.packages import chardet + + with pytest.raises(ImportError): + from requests.packages import webbrowser + + +if __name__ == '__main__': + unittest.main() |