summaryrefslogtreecommitdiff
path: root/python/requests/test_requests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/requests/test_requests.py')
-rwxr-xr-xpython/requests/test_requests.py1746
1 files changed, 1746 insertions, 0 deletions
diff --git a/python/requests/test_requests.py b/python/requests/test_requests.py
new file mode 100755
index 0000000000..0795241867
--- /dev/null
+++ b/python/requests/test_requests.py
@@ -0,0 +1,1746 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""Tests for Requests."""
+
+from __future__ import division
+import json
+import os
+import pickle
+import unittest
+import collections
+import contextlib
+
+import io
+import requests
+import pytest
+from requests.adapters import HTTPAdapter
+from requests.auth import HTTPDigestAuth, _basic_auth_str
+from requests.compat import (
+ Morsel, cookielib, getproxies, str, urljoin, urlparse, is_py3,
+ builtin_str, OrderedDict
+ )
+from requests.cookies import cookiejar_from_dict, morsel_to_cookie
+from requests.exceptions import (ConnectionError, ConnectTimeout,
+ InvalidSchema, InvalidURL, MissingSchema,
+ ReadTimeout, Timeout, RetryError)
+from requests.models import PreparedRequest
+from requests.structures import CaseInsensitiveDict
+from requests.sessions import SessionRedirectMixin
+from requests.models import urlencode
+from requests.hooks import default_hooks
+
+try:
+ import StringIO
+except ImportError:
+ import io as StringIO
+
+try:
+ from multiprocessing.pool import ThreadPool
+except ImportError:
+ ThreadPool = None
+
+if is_py3:
+ def u(s):
+ return s
+else:
+ def u(s):
+ return s.decode('unicode-escape')
+
+
+@pytest.fixture
+def httpbin(httpbin):
+ # Issue #1483: Make sure the URL always has a trailing slash
+ httpbin_url = httpbin.url.rstrip('/') + '/'
+
+ def inner(*suffix):
+ return urljoin(httpbin_url, '/'.join(suffix))
+
+ return inner
+
+
+@pytest.fixture
+def httpsbin_url(httpbin_secure):
+ # Issue #1483: Make sure the URL always has a trailing slash
+ httpbin_url = httpbin_secure.url.rstrip('/') + '/'
+
+ def inner(*suffix):
+ return urljoin(httpbin_url, '/'.join(suffix))
+
+ return inner
+
+
+# Requests to this URL should always fail with a connection timeout (nothing
+# listening on that port)
+TARPIT = "http://10.255.255.1"
+
+class TestRequests(object):
+
+ _multiprocess_can_split_ = True
+
+ def setUp(self):
+ """Create simple data set with headers."""
+ pass
+
+ def tearDown(self):
+ """Teardown."""
+ pass
+
+ def test_entry_points(self):
+
+ requests.session
+ requests.session().get
+ requests.session().head
+ requests.get
+ requests.head
+ requests.put
+ requests.patch
+ requests.post
+
+ def test_invalid_url(self):
+ with pytest.raises(MissingSchema):
+ requests.get('hiwpefhipowhefopw')
+ with pytest.raises(InvalidSchema):
+ requests.get('localhost:3128')
+ with pytest.raises(InvalidSchema):
+ requests.get('localhost.localdomain:3128/')
+ with pytest.raises(InvalidSchema):
+ requests.get('10.122.1.1:3128/')
+ with pytest.raises(InvalidURL):
+ requests.get('http://')
+
+ def test_basic_building(self):
+ req = requests.Request()
+ req.url = 'http://kennethreitz.org/'
+ req.data = {'life': '42'}
+
+ pr = req.prepare()
+ assert pr.url == req.url
+ assert pr.body == 'life=42'
+
+ def test_no_content_length(self, httpbin):
+ get_req = requests.Request('GET', httpbin('get')).prepare()
+ assert 'Content-Length' not in get_req.headers
+ head_req = requests.Request('HEAD', httpbin('head')).prepare()
+ assert 'Content-Length' not in head_req.headers
+
+ def test_override_content_length(self, httpbin):
+ headers = {
+ 'Content-Length': 'not zero'
+ }
+ r = requests.Request('POST', httpbin('post'), headers=headers).prepare()
+ assert 'Content-Length' in r.headers
+ assert r.headers['Content-Length'] == 'not zero'
+
+ def test_path_is_not_double_encoded(self):
+ request = requests.Request('GET', "http://0.0.0.0/get/test case").prepare()
+
+ assert request.path_url == '/get/test%20case'
+
+ def test_params_are_added_before_fragment(self):
+ request = requests.Request('GET',
+ "http://example.com/path#fragment", params={"a": "b"}).prepare()
+ assert request.url == "http://example.com/path?a=b#fragment"
+ request = requests.Request('GET',
+ "http://example.com/path?key=value#fragment", params={"a": "b"}).prepare()
+ assert request.url == "http://example.com/path?key=value&a=b#fragment"
+
+ def test_params_original_order_is_preserved_by_default(self):
+ param_ordered_dict = OrderedDict((('z', 1), ('a', 1), ('k', 1), ('d', 1)))
+ session = requests.Session()
+ request = requests.Request('GET', 'http://example.com/', params=param_ordered_dict)
+ prep = session.prepare_request(request)
+ assert prep.url == 'http://example.com/?z=1&a=1&k=1&d=1'
+
+ def test_params_bytes_are_encoded(self):
+ request = requests.Request('GET', 'http://example.com',
+ params=b'test=foo').prepare()
+ assert request.url == 'http://example.com/?test=foo'
+
+ def test_binary_put(self):
+ request = requests.Request('PUT', 'http://example.com',
+ data=u"ööö".encode("utf-8")).prepare()
+ assert isinstance(request.body, bytes)
+
+ def test_mixed_case_scheme_acceptable(self, httpbin):
+ s = requests.Session()
+ s.proxies = getproxies()
+ parts = urlparse(httpbin('get'))
+ schemes = ['http://', 'HTTP://', 'hTTp://', 'HttP://']
+ for scheme in schemes:
+ url = scheme + parts.netloc + parts.path
+ r = requests.Request('GET', url)
+ r = s.send(r.prepare())
+ assert r.status_code == 200, 'failed for scheme {0}'.format(scheme)
+
+ def test_HTTP_200_OK_GET_ALTERNATIVE(self, httpbin):
+ r = requests.Request('GET', httpbin('get'))
+ s = requests.Session()
+ s.proxies = getproxies()
+
+ r = s.send(r.prepare())
+
+ assert r.status_code == 200
+
+ def test_HTTP_302_ALLOW_REDIRECT_GET(self, httpbin):
+ r = requests.get(httpbin('redirect', '1'))
+ assert r.status_code == 200
+ assert r.history[0].status_code == 302
+ assert r.history[0].is_redirect
+
+ # def test_HTTP_302_ALLOW_REDIRECT_POST(self):
+ # r = requests.post(httpbin('status', '302'), data={'some': 'data'})
+ # self.assertEqual(r.status_code, 200)
+
+ def test_HTTP_200_OK_GET_WITH_PARAMS(self, httpbin):
+ heads = {'User-agent': 'Mozilla/5.0'}
+
+ r = requests.get(httpbin('user-agent'), headers=heads)
+
+ assert heads['User-agent'] in r.text
+ assert r.status_code == 200
+
+ def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self, httpbin):
+ heads = {'User-agent': 'Mozilla/5.0'}
+
+ r = requests.get(httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads)
+ assert r.status_code == 200
+
+ def test_set_cookie_on_301(self, httpbin):
+ s = requests.session()
+ url = httpbin('cookies/set?foo=bar')
+ s.get(url)
+ assert s.cookies['foo'] == 'bar'
+
+ def test_cookie_sent_on_redirect(self, httpbin):
+ s = requests.session()
+ s.get(httpbin('cookies/set?foo=bar'))
+ r = s.get(httpbin('redirect/1')) # redirects to httpbin('get')
+ assert 'Cookie' in r.json()['headers']
+
+ def test_cookie_removed_on_expire(self, httpbin):
+ s = requests.session()
+ s.get(httpbin('cookies/set?foo=bar'))
+ assert s.cookies['foo'] == 'bar'
+ s.get(
+ httpbin('response-headers'),
+ params={
+ 'Set-Cookie':
+ 'foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT'
+ }
+ )
+ assert 'foo' not in s.cookies
+
+ def test_cookie_quote_wrapped(self, httpbin):
+ s = requests.session()
+ s.get(httpbin('cookies/set?foo="bar:baz"'))
+ assert s.cookies['foo'] == '"bar:baz"'
+
+ def test_cookie_persists_via_api(self, httpbin):
+ s = requests.session()
+ r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'})
+ assert 'foo' in r.request.headers['Cookie']
+ assert 'foo' in r.history[0].request.headers['Cookie']
+
+ def test_request_cookie_overrides_session_cookie(self, httpbin):
+ s = requests.session()
+ s.cookies['foo'] = 'bar'
+ r = s.get(httpbin('cookies'), cookies={'foo': 'baz'})
+ assert r.json()['cookies']['foo'] == 'baz'
+ # Session cookie should not be modified
+ assert s.cookies['foo'] == 'bar'
+
+ def test_request_cookies_not_persisted(self, httpbin):
+ s = requests.session()
+ s.get(httpbin('cookies'), cookies={'foo': 'baz'})
+ # Sending a request with cookies should not add cookies to the session
+ assert not s.cookies
+
+ def test_generic_cookiejar_works(self, httpbin):
+ cj = cookielib.CookieJar()
+ cookiejar_from_dict({'foo': 'bar'}, cj)
+ s = requests.session()
+ s.cookies = cj
+ r = s.get(httpbin('cookies'))
+ # Make sure the cookie was sent
+ assert r.json()['cookies']['foo'] == 'bar'
+ # Make sure the session cj is still the custom one
+ assert s.cookies is cj
+
+ def test_param_cookiejar_works(self, httpbin):
+ cj = cookielib.CookieJar()
+ cookiejar_from_dict({'foo': 'bar'}, cj)
+ s = requests.session()
+ r = s.get(httpbin('cookies'), cookies=cj)
+ # Make sure the cookie was sent
+ assert r.json()['cookies']['foo'] == 'bar'
+
+ def test_requests_in_history_are_not_overridden(self, httpbin):
+ resp = requests.get(httpbin('redirect/3'))
+ urls = [r.url for r in resp.history]
+ req_urls = [r.request.url for r in resp.history]
+ assert urls == req_urls
+
+ def test_history_is_always_a_list(self, httpbin):
+ """
+ Show that even with redirects, Response.history is always a list.
+ """
+ resp = requests.get(httpbin('get'))
+ assert isinstance(resp.history, list)
+ resp = requests.get(httpbin('redirect/1'))
+ assert isinstance(resp.history, list)
+ assert not isinstance(resp.history, tuple)
+
+ def test_headers_on_session_with_None_are_not_sent(self, httpbin):
+ """Do not send headers in Session.headers with None values."""
+ ses = requests.Session()
+ ses.headers['Accept-Encoding'] = None
+ req = requests.Request('GET', httpbin('get'))
+ prep = ses.prepare_request(req)
+ assert 'Accept-Encoding' not in prep.headers
+
+ def test_user_agent_transfers(self, httpbin):
+
+ heads = {
+ 'User-agent': 'Mozilla/5.0 (github.com/kennethreitz/requests)'
+ }
+
+ r = requests.get(httpbin('user-agent'), headers=heads)
+ assert heads['User-agent'] in r.text
+
+ heads = {
+ 'user-agent': 'Mozilla/5.0 (github.com/kennethreitz/requests)'
+ }
+
+ r = requests.get(httpbin('user-agent'), headers=heads)
+ assert heads['user-agent'] in r.text
+
+ def test_HTTP_200_OK_HEAD(self, httpbin):
+ r = requests.head(httpbin('get'))
+ assert r.status_code == 200
+
+ def test_HTTP_200_OK_PUT(self, httpbin):
+ r = requests.put(httpbin('put'))
+ assert r.status_code == 200
+
+ def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin):
+ auth = ('user', 'pass')
+ url = httpbin('basic-auth', 'user', 'pass')
+
+ r = requests.get(url, auth=auth)
+ assert r.status_code == 200
+
+ r = requests.get(url)
+ assert r.status_code == 401
+
+ s = requests.session()
+ s.auth = auth
+ r = s.get(url)
+ assert r.status_code == 200
+
+ def test_connection_error_invalid_domain(self):
+ """Connecting to an unknown domain should raise a ConnectionError"""
+ with pytest.raises(ConnectionError):
+ requests.get("http://doesnotexist.google.com")
+
+ def test_connection_error_invalid_port(self):
+ """Connecting to an invalid port should raise a ConnectionError"""
+ with pytest.raises(ConnectionError):
+ requests.get("http://localhost:1", timeout=1)
+
+ def test_LocationParseError(self):
+ """Inputing a URL that cannot be parsed should raise an InvalidURL error"""
+ with pytest.raises(InvalidURL):
+ requests.get("http://fe80::5054:ff:fe5a:fc0")
+
+ def test_basicauth_with_netrc(self, httpbin):
+ auth = ('user', 'pass')
+ wrong_auth = ('wronguser', 'wrongpass')
+ url = httpbin('basic-auth', 'user', 'pass')
+
+ old_auth = requests.sessions.get_netrc_auth
+
+ try:
+ def get_netrc_auth_mock(url):
+ return auth
+ requests.sessions.get_netrc_auth = get_netrc_auth_mock
+
+ # Should use netrc and work.
+ r = requests.get(url)
+ assert r.status_code == 200
+
+ # Given auth should override and fail.
+ r = requests.get(url, auth=wrong_auth)
+ assert r.status_code == 401
+
+ s = requests.session()
+
+ # Should use netrc and work.
+ r = s.get(url)
+ assert r.status_code == 200
+
+ # Given auth should override and fail.
+ s.auth = wrong_auth
+ r = s.get(url)
+ assert r.status_code == 401
+ finally:
+ requests.sessions.get_netrc_auth = old_auth
+
+ def test_DIGEST_HTTP_200_OK_GET(self, httpbin):
+
+ auth = HTTPDigestAuth('user', 'pass')
+ url = httpbin('digest-auth', 'auth', 'user', 'pass')
+
+ r = requests.get(url, auth=auth)
+ assert r.status_code == 200
+
+ r = requests.get(url)
+ assert r.status_code == 401
+
+ s = requests.session()
+ s.auth = HTTPDigestAuth('user', 'pass')
+ r = s.get(url)
+ assert r.status_code == 200
+
+ def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin):
+ url = httpbin('digest-auth', 'auth', 'user', 'pass')
+ auth = HTTPDigestAuth('user', 'pass')
+ r = requests.get(url)
+ assert r.cookies['fake'] == 'fake_value'
+
+ r = requests.get(url, auth=auth)
+ assert r.status_code == 200
+
+ def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin):
+ url = httpbin('digest-auth', 'auth', 'user', 'pass')
+ auth = HTTPDigestAuth('user', 'pass')
+ s = requests.Session()
+ s.get(url, auth=auth)
+ assert s.cookies['fake'] == 'fake_value'
+
+ def test_DIGEST_STREAM(self, httpbin):
+
+ auth = HTTPDigestAuth('user', 'pass')
+ url = httpbin('digest-auth', 'auth', 'user', 'pass')
+
+ r = requests.get(url, auth=auth, stream=True)
+ assert r.raw.read() != b''
+
+ r = requests.get(url, auth=auth, stream=False)
+ assert r.raw.read() == b''
+
+ def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin):
+
+ auth = HTTPDigestAuth('user', 'wrongpass')
+ url = httpbin('digest-auth', 'auth', 'user', 'pass')
+
+ r = requests.get(url, auth=auth)
+ assert r.status_code == 401
+
+ r = requests.get(url)
+ assert r.status_code == 401
+
+ s = requests.session()
+ s.auth = auth
+ r = s.get(url)
+ assert r.status_code == 401
+
+ def test_DIGESTAUTH_QUOTES_QOP_VALUE(self, httpbin):
+
+ auth = HTTPDigestAuth('user', 'pass')
+ url = httpbin('digest-auth', 'auth', 'user', 'pass')
+
+ r = requests.get(url, auth=auth)
+ assert '"auth"' in r.request.headers['Authorization']
+
+ def test_POSTBIN_GET_POST_FILES(self, httpbin):
+
+ url = httpbin('post')
+ post1 = requests.post(url).raise_for_status()
+
+ post1 = requests.post(url, data={'some': 'data'})
+ assert post1.status_code == 200
+
+ with open('requirements.txt') as f:
+ post2 = requests.post(url, files={'some': f})
+ assert post2.status_code == 200
+
+ post4 = requests.post(url, data='[{"some": "json"}]')
+ assert post4.status_code == 200
+
+ with pytest.raises(ValueError):
+ requests.post(url, files=['bad file data'])
+
+ def test_POSTBIN_GET_POST_FILES_WITH_DATA(self, httpbin):
+
+ url = httpbin('post')
+ post1 = requests.post(url).raise_for_status()
+
+ post1 = requests.post(url, data={'some': 'data'})
+ assert post1.status_code == 200
+
+ with open('requirements.txt') as f:
+ post2 = requests.post(url,
+ data={'some': 'data'}, files={'some': f})
+ assert post2.status_code == 200
+
+ post4 = requests.post(url, data='[{"some": "json"}]')
+ assert post4.status_code == 200
+
+ with pytest.raises(ValueError):
+ requests.post(url, files=['bad file data'])
+
+ def test_conflicting_post_params(self, httpbin):
+ url = httpbin('post')
+ with open('requirements.txt') as f:
+ pytest.raises(ValueError, "requests.post(url, data='[{\"some\": \"data\"}]', files={'some': f})")
+ pytest.raises(ValueError, "requests.post(url, data=u('[{\"some\": \"data\"}]'), files={'some': f})")
+
+ def test_request_ok_set(self, httpbin):
+ r = requests.get(httpbin('status', '404'))
+ assert not r.ok
+
+ def test_status_raising(self, httpbin):
+ r = requests.get(httpbin('status', '404'))
+ with pytest.raises(requests.exceptions.HTTPError):
+ r.raise_for_status()
+
+ r = requests.get(httpbin('status', '500'))
+ assert not r.ok
+
+ def test_decompress_gzip(self, httpbin):
+ r = requests.get(httpbin('gzip'))
+ r.content.decode('ascii')
+
+ def test_unicode_get(self, httpbin):
+ url = httpbin('/get')
+ requests.get(url, params={'foo': 'føø'})
+ requests.get(url, params={'føø': 'føø'})
+ requests.get(url, params={'føø': 'føø'})
+ requests.get(url, params={'foo': 'foo'})
+ requests.get(httpbin('ø'), params={'foo': 'foo'})
+
+ def test_unicode_header_name(self, httpbin):
+ requests.put(
+ httpbin('put'),
+ headers={str('Content-Type'): 'application/octet-stream'},
+ data='\xff') # compat.str is unicode.
+
+ def test_pyopenssl_redirect(self, httpsbin_url, httpbin_ca_bundle):
+ requests.get(httpsbin_url('status', '301'), verify=httpbin_ca_bundle)
+
+ def test_urlencoded_get_query_multivalued_param(self, httpbin):
+
+ r = requests.get(httpbin('get'), params=dict(test=['foo', 'baz']))
+ assert r.status_code == 200
+ assert r.url == httpbin('get?test=foo&test=baz')
+
+ def test_different_encodings_dont_break_post(self, httpbin):
+ r = requests.post(httpbin('post'),
+ data={'stuff': json.dumps({'a': 123})},
+ params={'blah': 'asdf1234'},
+ files={'file': ('test_requests.py', open(__file__, 'rb'))})
+ assert r.status_code == 200
+
+ def test_unicode_multipart_post(self, httpbin):
+ r = requests.post(httpbin('post'),
+ data={'stuff': u('ëlïxr')},
+ files={'file': ('test_requests.py', open(__file__, 'rb'))})
+ assert r.status_code == 200
+
+ r = requests.post(httpbin('post'),
+ data={'stuff': u('ëlïxr').encode('utf-8')},
+ files={'file': ('test_requests.py', open(__file__, 'rb'))})
+ assert r.status_code == 200
+
+ r = requests.post(httpbin('post'),
+ data={'stuff': 'elixr'},
+ files={'file': ('test_requests.py', open(__file__, 'rb'))})
+ assert r.status_code == 200
+
+ r = requests.post(httpbin('post'),
+ data={'stuff': 'elixr'.encode('utf-8')},
+ files={'file': ('test_requests.py', open(__file__, 'rb'))})
+ assert r.status_code == 200
+
+ def test_unicode_multipart_post_fieldnames(self, httpbin):
+ filename = os.path.splitext(__file__)[0] + '.py'
+ r = requests.Request(method='POST',
+ url=httpbin('post'),
+ data={'stuff'.encode('utf-8'): 'elixr'},
+ files={'file': ('test_requests.py',
+ open(filename, 'rb'))})
+ prep = r.prepare()
+ assert b'name="stuff"' in prep.body
+ assert b'name="b\'stuff\'"' not in prep.body
+
+ def test_unicode_method_name(self, httpbin):
+ files = {'file': open('test_requests.py', 'rb')}
+ r = requests.request(
+ method=u('POST'), url=httpbin('post'), files=files)
+ assert r.status_code == 200
+
+ def test_unicode_method_name_with_request_object(self, httpbin):
+ files = {'file': open('test_requests.py', 'rb')}
+ s = requests.Session()
+ req = requests.Request(u("POST"), httpbin('post'), files=files)
+ prep = s.prepare_request(req)
+ assert isinstance(prep.method, builtin_str)
+ assert prep.method == "POST"
+
+ resp = s.send(prep)
+ assert resp.status_code == 200
+
+ def test_custom_content_type(self, httpbin):
+ r = requests.post(
+ httpbin('post'),
+ data={'stuff': json.dumps({'a': 123})},
+ files={'file1': ('test_requests.py', open(__file__, 'rb')),
+ 'file2': ('test_requests', open(__file__, 'rb'),
+ 'text/py-content-type')})
+ assert r.status_code == 200
+ assert b"text/py-content-type" in r.request.body
+
+ def test_hook_receives_request_arguments(self, httpbin):
+ def hook(resp, **kwargs):
+ assert resp is not None
+ assert kwargs != {}
+
+ requests.Request('GET', httpbin(), hooks={'response': hook})
+
+ def test_session_hooks_are_used_with_no_request_hooks(self, httpbin):
+ hook = lambda x, *args, **kwargs: x
+ s = requests.Session()
+ s.hooks['response'].append(hook)
+ r = requests.Request('GET', httpbin())
+ prep = s.prepare_request(r)
+ assert prep.hooks['response'] != []
+ assert prep.hooks['response'] == [hook]
+
+ def test_session_hooks_are_overridden_by_request_hooks(self, httpbin):
+ hook1 = lambda x, *args, **kwargs: x
+ hook2 = lambda x, *args, **kwargs: x
+ assert hook1 is not hook2
+ s = requests.Session()
+ s.hooks['response'].append(hook2)
+ r = requests.Request('GET', httpbin(), hooks={'response': [hook1]})
+ prep = s.prepare_request(r)
+ assert prep.hooks['response'] == [hook1]
+
+ def test_prepared_request_hook(self, httpbin):
+ def hook(resp, **kwargs):
+ resp.hook_working = True
+ return resp
+
+ req = requests.Request('GET', httpbin(), hooks={'response': hook})
+ prep = req.prepare()
+
+ s = requests.Session()
+ s.proxies = getproxies()
+ resp = s.send(prep)
+
+ assert hasattr(resp, 'hook_working')
+
+ def test_prepared_from_session(self, httpbin):
+ class DummyAuth(requests.auth.AuthBase):
+ def __call__(self, r):
+ r.headers['Dummy-Auth-Test'] = 'dummy-auth-test-ok'
+ return r
+
+ req = requests.Request('GET', httpbin('headers'))
+ assert not req.auth
+
+ s = requests.Session()
+ s.auth = DummyAuth()
+
+ prep = s.prepare_request(req)
+ resp = s.send(prep)
+
+ assert resp.json()['headers'][
+ 'Dummy-Auth-Test'] == 'dummy-auth-test-ok'
+
+ def test_prepare_request_with_bytestring_url(self):
+ req = requests.Request('GET', b'https://httpbin.org/')
+ s = requests.Session()
+ prep = s.prepare_request(req)
+ assert prep.url == "https://httpbin.org/"
+
+ def test_links(self):
+ r = requests.Response()
+ r.headers = {
+ 'cache-control': 'public, max-age=60, s-maxage=60',
+ 'connection': 'keep-alive',
+ 'content-encoding': 'gzip',
+ 'content-type': 'application/json; charset=utf-8',
+ 'date': 'Sat, 26 Jan 2013 16:47:56 GMT',
+ 'etag': '"6ff6a73c0e446c1f61614769e3ceb778"',
+ 'last-modified': 'Sat, 26 Jan 2013 16:22:39 GMT',
+ 'link': ('<https://api.github.com/users/kennethreitz/repos?'
+ 'page=2&per_page=10>; rel="next", <https://api.github.'
+ 'com/users/kennethreitz/repos?page=7&per_page=10>; '
+ ' rel="last"'),
+ 'server': 'GitHub.com',
+ 'status': '200 OK',
+ 'vary': 'Accept',
+ 'x-content-type-options': 'nosniff',
+ 'x-github-media-type': 'github.beta',
+ 'x-ratelimit-limit': '60',
+ 'x-ratelimit-remaining': '57'
+ }
+ assert r.links['next']['rel'] == 'next'
+
+ def test_cookie_parameters(self):
+ key = 'some_cookie'
+ value = 'some_value'
+ secure = True
+ domain = 'test.com'
+ rest = {'HttpOnly': True}
+
+ jar = requests.cookies.RequestsCookieJar()
+ jar.set(key, value, secure=secure, domain=domain, rest=rest)
+
+ assert len(jar) == 1
+ assert 'some_cookie' in jar
+
+ cookie = list(jar)[0]
+ assert cookie.secure == secure
+ assert cookie.domain == domain
+ assert cookie._rest['HttpOnly'] == rest['HttpOnly']
+
+ def test_cookie_as_dict_keeps_len(self):
+ key = 'some_cookie'
+ value = 'some_value'
+
+ key1 = 'some_cookie1'
+ value1 = 'some_value1'
+
+ jar = requests.cookies.RequestsCookieJar()
+ jar.set(key, value)
+ jar.set(key1, value1)
+
+ d1 = dict(jar)
+ d2 = dict(jar.iteritems())
+ d3 = dict(jar.items())
+
+ assert len(jar) == 2
+ assert len(d1) == 2
+ assert len(d2) == 2
+ assert len(d3) == 2
+
+ def test_cookie_as_dict_keeps_items(self):
+ key = 'some_cookie'
+ value = 'some_value'
+
+ key1 = 'some_cookie1'
+ value1 = 'some_value1'
+
+ jar = requests.cookies.RequestsCookieJar()
+ jar.set(key, value)
+ jar.set(key1, value1)
+
+ d1 = dict(jar)
+ d2 = dict(jar.iteritems())
+ d3 = dict(jar.items())
+
+ assert d1['some_cookie'] == 'some_value'
+ assert d2['some_cookie'] == 'some_value'
+ assert d3['some_cookie1'] == 'some_value1'
+
+ def test_cookie_as_dict_keys(self):
+ key = 'some_cookie'
+ value = 'some_value'
+
+ key1 = 'some_cookie1'
+ value1 = 'some_value1'
+
+ jar = requests.cookies.RequestsCookieJar()
+ jar.set(key, value)
+ jar.set(key1, value1)
+
+ keys = jar.keys()
+ assert keys == list(keys)
+ # make sure one can use keys multiple times
+ assert list(keys) == list(keys)
+
+ def test_cookie_as_dict_values(self):
+ key = 'some_cookie'
+ value = 'some_value'
+
+ key1 = 'some_cookie1'
+ value1 = 'some_value1'
+
+ jar = requests.cookies.RequestsCookieJar()
+ jar.set(key, value)
+ jar.set(key1, value1)
+
+ values = jar.values()
+ assert values == list(values)
+ # make sure one can use values multiple times
+ assert list(values) == list(values)
+
+ def test_cookie_as_dict_items(self):
+ key = 'some_cookie'
+ value = 'some_value'
+
+ key1 = 'some_cookie1'
+ value1 = 'some_value1'
+
+ jar = requests.cookies.RequestsCookieJar()
+ jar.set(key, value)
+ jar.set(key1, value1)
+
+ items = jar.items()
+ assert items == list(items)
+ # make sure one can use items multiple times
+ assert list(items) == list(items)
+
+ def test_time_elapsed_blank(self, httpbin):
+ r = requests.get(httpbin('get'))
+ td = r.elapsed
+ total_seconds = ((td.microseconds + (td.seconds + td.days * 24 * 3600)
+ * 10**6) / 10**6)
+ assert total_seconds > 0.0
+
+ def test_response_is_iterable(self):
+ r = requests.Response()
+ io = StringIO.StringIO('abc')
+ read_ = io.read
+
+ def read_mock(amt, decode_content=None):
+ return read_(amt)
+ setattr(io, 'read', read_mock)
+ r.raw = io
+ assert next(iter(r))
+ io.close()
+
+ def test_response_decode_unicode(self):
+ """
+ When called with decode_unicode, Response.iter_content should always
+ return unicode.
+ """
+ r = requests.Response()
+ r._content_consumed = True
+ r._content = b'the content'
+ r.encoding = 'ascii'
+
+ chunks = r.iter_content(decode_unicode=True)
+ assert all(isinstance(chunk, str) for chunk in chunks)
+
+ # also for streaming
+ r = requests.Response()
+ r.raw = io.BytesIO(b'the content')
+ r.encoding = 'ascii'
+ chunks = r.iter_content(decode_unicode=True)
+ assert all(isinstance(chunk, str) for chunk in chunks)
+
+ def test_request_and_response_are_pickleable(self, httpbin):
+ r = requests.get(httpbin('get'))
+
+ # verify we can pickle the original request
+ assert pickle.loads(pickle.dumps(r.request))
+
+ # verify we can pickle the response and that we have access to
+ # the original request.
+ pr = pickle.loads(pickle.dumps(r))
+ assert r.request.url == pr.request.url
+ assert r.request.headers == pr.request.headers
+
+ def test_get_auth_from_url(self):
+ url = 'http://user:pass@complex.url.com/path?query=yes'
+ assert ('user', 'pass') == requests.utils.get_auth_from_url(url)
+
+ def test_get_auth_from_url_encoded_spaces(self):
+ url = 'http://user:pass%20pass@complex.url.com/path?query=yes'
+ assert ('user', 'pass pass') == requests.utils.get_auth_from_url(url)
+
+ def test_get_auth_from_url_not_encoded_spaces(self):
+ url = 'http://user:pass pass@complex.url.com/path?query=yes'
+ assert ('user', 'pass pass') == requests.utils.get_auth_from_url(url)
+
+ def test_get_auth_from_url_percent_chars(self):
+ url = 'http://user%25user:pass@complex.url.com/path?query=yes'
+ assert ('user%user', 'pass') == requests.utils.get_auth_from_url(url)
+
+ def test_get_auth_from_url_encoded_hashes(self):
+ url = 'http://user:pass%23pass@complex.url.com/path?query=yes'
+ assert ('user', 'pass#pass') == requests.utils.get_auth_from_url(url)
+
+ def test_cannot_send_unprepared_requests(self, httpbin):
+ r = requests.Request(url=httpbin())
+ with pytest.raises(ValueError):
+ requests.Session().send(r)
+
+ def test_http_error(self):
+ error = requests.exceptions.HTTPError()
+ assert not error.response
+ response = requests.Response()
+ error = requests.exceptions.HTTPError(response=response)
+ assert error.response == response
+ error = requests.exceptions.HTTPError('message', response=response)
+ assert str(error) == 'message'
+ assert error.response == response
+
+ def test_session_pickling(self, httpbin):
+ r = requests.Request('GET', httpbin('get'))
+ s = requests.Session()
+
+ s = pickle.loads(pickle.dumps(s))
+ s.proxies = getproxies()
+
+ r = s.send(r.prepare())
+ assert r.status_code == 200
+
+ def test_fixes_1329(self, httpbin):
+ """
+ Ensure that header updates are done case-insensitively.
+ """
+ s = requests.Session()
+ s.headers.update({'ACCEPT': 'BOGUS'})
+ s.headers.update({'accept': 'application/json'})
+ r = s.get(httpbin('get'))
+ headers = r.request.headers
+ assert headers['accept'] == 'application/json'
+ assert headers['Accept'] == 'application/json'
+ assert headers['ACCEPT'] == 'application/json'
+
+ def test_uppercase_scheme_redirect(self, httpbin):
+ parts = urlparse(httpbin('html'))
+ url = "HTTP://" + parts.netloc + parts.path
+ r = requests.get(httpbin('redirect-to'), params={'url': url})
+ assert r.status_code == 200
+ assert r.url.lower() == url.lower()
+
+ def test_transport_adapter_ordering(self):
+ s = requests.Session()
+ order = ['https://', 'http://']
+ assert order == list(s.adapters)
+ s.mount('http://git', HTTPAdapter())
+ s.mount('http://github', HTTPAdapter())
+ s.mount('http://github.com', HTTPAdapter())
+ s.mount('http://github.com/about/', HTTPAdapter())
+ order = [
+ 'http://github.com/about/',
+ 'http://github.com',
+ 'http://github',
+ 'http://git',
+ 'https://',
+ 'http://',
+ ]
+ assert order == list(s.adapters)
+ s.mount('http://gittip', HTTPAdapter())
+ s.mount('http://gittip.com', HTTPAdapter())
+ s.mount('http://gittip.com/about/', HTTPAdapter())
+ order = [
+ 'http://github.com/about/',
+ 'http://gittip.com/about/',
+ 'http://github.com',
+ 'http://gittip.com',
+ 'http://github',
+ 'http://gittip',
+ 'http://git',
+ 'https://',
+ 'http://',
+ ]
+ assert order == list(s.adapters)
+ s2 = requests.Session()
+ s2.adapters = {'http://': HTTPAdapter()}
+ s2.mount('https://', HTTPAdapter())
+ assert 'http://' in s2.adapters
+ assert 'https://' in s2.adapters
+
+ def test_header_remove_is_case_insensitive(self, httpbin):
+ # From issue #1321
+ s = requests.Session()
+ s.headers['foo'] = 'bar'
+ r = s.get(httpbin('get'), headers={'FOO': None})
+ assert 'foo' not in r.request.headers
+
+ def test_params_are_merged_case_sensitive(self, httpbin):
+ s = requests.Session()
+ s.params['foo'] = 'bar'
+ r = s.get(httpbin('get'), params={'FOO': 'bar'})
+ assert r.json()['args'] == {'foo': 'bar', 'FOO': 'bar'}
+
+ def test_long_authinfo_in_url(self):
+ url = 'http://{0}:{1}@{2}:9000/path?query#frag'.format(
+ 'E8A3BE87-9E3F-4620-8858-95478E385B5B',
+ 'EA770032-DA4D-4D84-8CE9-29C6D910BF1E',
+ 'exactly-------------sixty-----------three------------characters',
+ )
+ r = requests.Request('GET', url).prepare()
+ assert r.url == url
+
+ def test_header_keys_are_native(self, httpbin):
+ headers = {u('unicode'): 'blah', 'byte'.encode('ascii'): 'blah'}
+ r = requests.Request('GET', httpbin('get'), headers=headers)
+ p = r.prepare()
+
+ # This is testing that they are builtin strings. A bit weird, but there
+ # we go.
+ assert 'unicode' in p.headers.keys()
+ assert 'byte' in p.headers.keys()
+
+ def test_can_send_nonstring_objects_with_files(self, httpbin):
+ data = {'a': 0.0}
+ files = {'b': 'foo'}
+ r = requests.Request('POST', httpbin('post'), data=data, files=files)
+ p = r.prepare()
+
+ assert 'multipart/form-data' in p.headers['Content-Type']
+
+ def test_can_send_bytes_bytearray_objects_with_files(self, httpbin):
+ # Test bytes:
+ data = {'a': 'this is a string'}
+ files = {'b': b'foo'}
+ r = requests.Request('POST', httpbin('post'), data=data, files=files)
+ p = r.prepare()
+ assert 'multipart/form-data' in p.headers['Content-Type']
+ # Test bytearrays:
+ files = {'b': bytearray(b'foo')}
+ r = requests.Request('POST', httpbin('post'), data=data, files=files)
+ p = r.prepare()
+ assert 'multipart/form-data' in p.headers['Content-Type']
+
+ def test_can_send_file_object_with_non_string_filename(self, httpbin):
+ f = io.BytesIO()
+ f.name = 2
+ r = requests.Request('POST', httpbin('post'), files={'f': f})
+ p = r.prepare()
+
+ assert 'multipart/form-data' in p.headers['Content-Type']
+
+ def test_autoset_header_values_are_native(self, httpbin):
+ data = 'this is a string'
+ length = '16'
+ req = requests.Request('POST', httpbin('post'), data=data)
+ p = req.prepare()
+
+ assert p.headers['Content-Length'] == length
+
+ def test_nonhttp_schemes_dont_check_URLs(self):
+ test_urls = (
+ '',
+ 'file:///etc/passwd',
+ 'magnet:?xt=urn:btih:be08f00302bc2d1d3cfa3af02024fa647a271431',
+ )
+ for test_url in test_urls:
+ req = requests.Request('GET', test_url)
+ preq = req.prepare()
+ assert test_url == preq.url
+
+ def test_auth_is_stripped_on_redirect_off_host(self, httpbin):
+ r = requests.get(
+ httpbin('redirect-to'),
+ params={'url': 'http://www.google.co.uk'},
+ auth=('user', 'pass'),
+ )
+ assert r.history[0].request.headers['Authorization']
+ assert not r.request.headers.get('Authorization', '')
+
+ def test_auth_is_retained_for_redirect_on_host(self, httpbin):
+ r = requests.get(httpbin('redirect/1'), auth=('user', 'pass'))
+ h1 = r.history[0].request.headers['Authorization']
+ h2 = r.request.headers['Authorization']
+
+ assert h1 == h2
+
+ def test_manual_redirect_with_partial_body_read(self, httpbin):
+ s = requests.Session()
+ r1 = s.get(httpbin('redirect/2'), allow_redirects=False, stream=True)
+ assert r1.is_redirect
+ rg = s.resolve_redirects(r1, r1.request, stream=True)
+
+ # read only the first eight bytes of the response body,
+ # then follow the redirect
+ r1.iter_content(8)
+ r2 = next(rg)
+ assert r2.is_redirect
+
+ # read all of the response via iter_content,
+ # then follow the redirect
+ for _ in r2.iter_content():
+ pass
+ r3 = next(rg)
+ assert not r3.is_redirect
+
+ def _patch_adapter_gzipped_redirect(self, session, url):
+ adapter = session.get_adapter(url=url)
+ org_build_response = adapter.build_response
+ self._patched_response = False
+
+ def build_response(*args, **kwargs):
+ resp = org_build_response(*args, **kwargs)
+ if not self._patched_response:
+ resp.raw.headers['content-encoding'] = 'gzip'
+ self._patched_response = True
+ return resp
+
+ adapter.build_response = build_response
+
+ def test_redirect_with_wrong_gzipped_header(self, httpbin):
+ s = requests.Session()
+ url = httpbin('redirect/1')
+ self._patch_adapter_gzipped_redirect(s, url)
+ s.get(url)
+
+ def test_basic_auth_str_is_always_native(self):
+ s = _basic_auth_str("test", "test")
+ assert isinstance(s, builtin_str)
+ assert s == "Basic dGVzdDp0ZXN0"
+
+ def test_requests_history_is_saved(self, httpbin):
+ r = requests.get(httpbin('redirect/5'))
+ total = r.history[-1].history
+ i = 0
+ for item in r.history:
+ assert item.history == total[0:i]
+ i = i + 1
+
+ def test_json_param_post_content_type_works(self, httpbin):
+ r = requests.post(
+ httpbin('post'),
+ json={'life': 42}
+ )
+ assert r.status_code == 200
+ assert 'application/json' in r.request.headers['Content-Type']
+ assert {'life': 42} == r.json()['json']
+
+ def test_json_param_post_should_not_override_data_param(self, httpbin):
+ r = requests.Request(method='POST', url=httpbin('post'),
+ data={'stuff': 'elixr'},
+ json={'music': 'flute'})
+ prep = r.prepare()
+ assert 'stuff=elixr' == prep.body
+
+ def test_response_iter_lines(self, httpbin):
+ r = requests.get(httpbin('stream/4'), stream=True)
+ assert r.status_code == 200
+
+ it = r.iter_lines()
+ next(it)
+ assert len(list(it)) == 3
+
+ def test_unconsumed_session_response_closes_connection(self, httpbin):
+ s = requests.session()
+
+ with contextlib.closing(s.get(httpbin('stream/4'), stream=True)) as response:
+ pass
+
+ assert response._content_consumed is False
+ assert response.raw.closed
+
+ @pytest.mark.xfail
+ def test_response_iter_lines_reentrant(self, httpbin):
+ """Response.iter_lines() is not reentrant safe"""
+ r = requests.get(httpbin('stream/4'), stream=True)
+ assert r.status_code == 200
+
+ next(r.iter_lines())
+ assert len(list(r.iter_lines())) == 3
+
+
+class TestContentEncodingDetection(unittest.TestCase):
+
+ def test_none(self):
+ encodings = requests.utils.get_encodings_from_content('')
+ assert not len(encodings)
+
+ def test_html_charset(self):
+ """HTML5 meta charset attribute"""
+ content = '<meta charset="UTF-8">'
+ encodings = requests.utils.get_encodings_from_content(content)
+ assert len(encodings) == 1
+ assert encodings[0] == 'UTF-8'
+
+ def test_html4_pragma(self):
+ """HTML4 pragma directive"""
+ content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8">'
+ encodings = requests.utils.get_encodings_from_content(content)
+ assert len(encodings) == 1
+ assert encodings[0] == 'UTF-8'
+
+ def test_xhtml_pragma(self):
+ """XHTML 1.x served with text/html MIME type"""
+ content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8" />'
+ encodings = requests.utils.get_encodings_from_content(content)
+ assert len(encodings) == 1
+ assert encodings[0] == 'UTF-8'
+
+ def test_xml(self):
+ """XHTML 1.x served as XML"""
+ content = '<?xml version="1.0" encoding="UTF-8"?>'
+ encodings = requests.utils.get_encodings_from_content(content)
+ assert len(encodings) == 1
+ assert encodings[0] == 'UTF-8'
+
+ def test_precedence(self):
+ content = '''
+ <?xml version="1.0" encoding="XML"?>
+ <meta charset="HTML5">
+ <meta http-equiv="Content-type" content="text/html;charset=HTML4" />
+ '''.strip()
+ encodings = requests.utils.get_encodings_from_content(content)
+ assert encodings == ['HTML5', 'HTML4', 'XML']
+
+
+class TestCaseInsensitiveDict(unittest.TestCase):
+
+ def test_mapping_init(self):
+ cid = CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'})
+ assert len(cid) == 2
+ assert 'foo' in cid
+ assert 'bar' in cid
+
+ def test_iterable_init(self):
+ cid = CaseInsensitiveDict([('Foo', 'foo'), ('BAr', 'bar')])
+ assert len(cid) == 2
+ assert 'foo' in cid
+ assert 'bar' in cid
+
+ def test_kwargs_init(self):
+ cid = CaseInsensitiveDict(FOO='foo', BAr='bar')
+ assert len(cid) == 2
+ assert 'foo' in cid
+ assert 'bar' in cid
+
+ def test_docstring_example(self):
+ cid = CaseInsensitiveDict()
+ cid['Accept'] = 'application/json'
+ assert cid['aCCEPT'] == 'application/json'
+ assert list(cid) == ['Accept']
+
+ def test_len(self):
+ cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'})
+ cid['A'] = 'a'
+ assert len(cid) == 2
+
+ def test_getitem(self):
+ cid = CaseInsensitiveDict({'Spam': 'blueval'})
+ assert cid['spam'] == 'blueval'
+ assert cid['SPAM'] == 'blueval'
+
+ def test_fixes_649(self):
+ """__setitem__ should behave case-insensitively."""
+ cid = CaseInsensitiveDict()
+ cid['spam'] = 'oneval'
+ cid['Spam'] = 'twoval'
+ cid['sPAM'] = 'redval'
+ cid['SPAM'] = 'blueval'
+ assert cid['spam'] == 'blueval'
+ assert cid['SPAM'] == 'blueval'
+ assert list(cid.keys()) == ['SPAM']
+
+ def test_delitem(self):
+ cid = CaseInsensitiveDict()
+ cid['Spam'] = 'someval'
+ del cid['sPam']
+ assert 'spam' not in cid
+ assert len(cid) == 0
+
+ def test_contains(self):
+ cid = CaseInsensitiveDict()
+ cid['Spam'] = 'someval'
+ assert 'Spam' in cid
+ assert 'spam' in cid
+ assert 'SPAM' in cid
+ assert 'sPam' in cid
+ assert 'notspam' not in cid
+
+ def test_get(self):
+ cid = CaseInsensitiveDict()
+ cid['spam'] = 'oneval'
+ cid['SPAM'] = 'blueval'
+ assert cid.get('spam') == 'blueval'
+ assert cid.get('SPAM') == 'blueval'
+ assert cid.get('sPam') == 'blueval'
+ assert cid.get('notspam', 'default') == 'default'
+
+ def test_update(self):
+ cid = CaseInsensitiveDict()
+ cid['spam'] = 'blueval'
+ cid.update({'sPam': 'notblueval'})
+ assert cid['spam'] == 'notblueval'
+ cid = CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'})
+ cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'})
+ assert len(cid) == 2
+ assert cid['foo'] == 'anotherfoo'
+ assert cid['bar'] == 'anotherbar'
+
+ def test_update_retains_unchanged(self):
+ cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'})
+ cid.update({'foo': 'newfoo'})
+ assert cid['bar'] == 'bar'
+
+ def test_iter(self):
+ cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'})
+ keys = frozenset(['Spam', 'Eggs'])
+ assert frozenset(iter(cid)) == keys
+
+ def test_equality(self):
+ cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'})
+ othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'})
+ assert cid == othercid
+ del othercid['spam']
+ assert cid != othercid
+ assert cid == {'spam': 'blueval', 'eggs': 'redval'}
+ assert cid != object()
+
+ def test_setdefault(self):
+ cid = CaseInsensitiveDict({'Spam': 'blueval'})
+ assert cid.setdefault('spam', 'notblueval') == 'blueval'
+ assert cid.setdefault('notspam', 'notblueval') == 'notblueval'
+
+ def test_lower_items(self):
+ cid = CaseInsensitiveDict({
+ 'Accept': 'application/json',
+ 'user-Agent': 'requests',
+ })
+ keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items())
+ lowerkeyset = frozenset(['accept', 'user-agent'])
+ assert keyset == lowerkeyset
+
+ def test_preserve_key_case(self):
+ cid = CaseInsensitiveDict({
+ 'Accept': 'application/json',
+ 'user-Agent': 'requests',
+ })
+ keyset = frozenset(['Accept', 'user-Agent'])
+ assert frozenset(i[0] for i in cid.items()) == keyset
+ assert frozenset(cid.keys()) == keyset
+ assert frozenset(cid) == keyset
+
+ def test_preserve_last_key_case(self):
+ cid = CaseInsensitiveDict({
+ 'Accept': 'application/json',
+ 'user-Agent': 'requests',
+ })
+ cid.update({'ACCEPT': 'application/json'})
+ cid['USER-AGENT'] = 'requests'
+ keyset = frozenset(['ACCEPT', 'USER-AGENT'])
+ assert frozenset(i[0] for i in cid.items()) == keyset
+ assert frozenset(cid.keys()) == keyset
+ assert frozenset(cid) == keyset
+
+ def test_copy(self):
+ cid = CaseInsensitiveDict({
+ 'Accept': 'application/json',
+ 'user-Agent': 'requests',
+ })
+ cid_copy = cid.copy()
+ assert cid == cid_copy
+ cid['changed'] = True
+ assert cid != cid_copy
+
+
+class UtilsTestCase(unittest.TestCase):
+
+ def test_super_len_io_streams(self):
+ """ Ensures that we properly deal with different kinds of IO streams. """
+ # uses StringIO or io.StringIO (see import above)
+ from io import BytesIO
+ from requests.utils import super_len
+
+ assert super_len(StringIO.StringIO()) == 0
+ assert super_len(
+ StringIO.StringIO('with so much drama in the LBC')) == 29
+
+ assert super_len(BytesIO()) == 0
+ assert super_len(
+ BytesIO(b"it's kinda hard bein' snoop d-o-double-g")) == 40
+
+ try:
+ import cStringIO
+ except ImportError:
+ pass
+ else:
+ assert super_len(
+ cStringIO.StringIO('but some how, some way...')) == 25
+
+ def test_super_len_correctly_calculates_len_of_partially_read_file(self):
+ """Ensure that we handle partially consumed file like objects."""
+ from requests.utils import super_len
+ s = StringIO.StringIO()
+ s.write('foobarbogus')
+ assert super_len(s) == 0
+
+ def test_get_environ_proxies_ip_ranges(self):
+ """Ensures that IP addresses are correctly matches with ranges
+ in no_proxy variable."""
+ from requests.utils import get_environ_proxies
+ os.environ['no_proxy'] = "192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1"
+ assert get_environ_proxies('http://192.168.0.1:5000/') == {}
+ assert get_environ_proxies('http://192.168.0.1/') == {}
+ assert get_environ_proxies('http://172.16.1.1/') == {}
+ assert get_environ_proxies('http://172.16.1.1:5000/') == {}
+ assert get_environ_proxies('http://192.168.1.1:5000/') != {}
+ assert get_environ_proxies('http://192.168.1.1/') != {}
+
+ def test_get_environ_proxies(self):
+ """Ensures that IP addresses are correctly matches with ranges
+ in no_proxy variable."""
+ from requests.utils import get_environ_proxies
+ os.environ['no_proxy'] = "127.0.0.1,localhost.localdomain,192.168.0.0/24,172.16.1.1"
+ assert get_environ_proxies(
+ 'http://localhost.localdomain:5000/v1.0/') == {}
+ assert get_environ_proxies('http://www.requests.com/') != {}
+
+ def test_select_proxies(self):
+ """Make sure we can select per-host proxies correctly."""
+ from requests.utils import select_proxy
+ proxies = {'http': 'http://http.proxy',
+ 'http://some.host': 'http://some.host.proxy'}
+ assert select_proxy('hTTp://u:p@Some.Host/path', proxies) == 'http://some.host.proxy'
+ assert select_proxy('hTTp://u:p@Other.Host/path', proxies) == 'http://http.proxy'
+ assert select_proxy('hTTps://Other.Host', proxies) is None
+
+ def test_guess_filename_when_int(self):
+ from requests.utils import guess_filename
+ assert None is guess_filename(1)
+
+ def test_guess_filename_when_filename_is_an_int(self):
+ from requests.utils import guess_filename
+ fake = type('Fake', (object,), {'name': 1})()
+ assert None is guess_filename(fake)
+
+ def test_guess_filename_with_file_like_obj(self):
+ from requests.utils import guess_filename
+ from requests import compat
+ fake = type('Fake', (object,), {'name': b'value'})()
+ guessed_name = guess_filename(fake)
+ assert b'value' == guessed_name
+ assert isinstance(guessed_name, compat.bytes)
+
+ def test_guess_filename_with_unicode_name(self):
+ from requests.utils import guess_filename
+ from requests import compat
+ filename = b'value'.decode('utf-8')
+ fake = type('Fake', (object,), {'name': filename})()
+ guessed_name = guess_filename(fake)
+ assert filename == guessed_name
+ assert isinstance(guessed_name, compat.str)
+
+ def test_is_ipv4_address(self):
+ from requests.utils import is_ipv4_address
+ assert is_ipv4_address('8.8.8.8')
+ assert not is_ipv4_address('8.8.8.8.8')
+ assert not is_ipv4_address('localhost.localdomain')
+
+ def test_is_valid_cidr(self):
+ from requests.utils import is_valid_cidr
+ assert not is_valid_cidr('8.8.8.8')
+ assert is_valid_cidr('192.168.1.0/24')
+
+ def test_dotted_netmask(self):
+ from requests.utils import dotted_netmask
+ assert dotted_netmask(8) == '255.0.0.0'
+ assert dotted_netmask(24) == '255.255.255.0'
+ assert dotted_netmask(25) == '255.255.255.128'
+
+ def test_address_in_network(self):
+ from requests.utils import address_in_network
+ assert address_in_network('192.168.1.1', '192.168.1.0/24')
+ assert not address_in_network('172.16.0.1', '192.168.1.0/24')
+
+ def test_get_auth_from_url(self):
+ """Ensures that username and password in well-encoded URI as per
+ RFC 3986 are correclty extracted."""
+ from requests.utils import get_auth_from_url
+ from requests.compat import quote
+ percent_encoding_test_chars = "%!*'();:@&=+$,/?#[] "
+ url_address = "request.com/url.html#test"
+ url = "http://" + quote(
+ percent_encoding_test_chars, '') + ':' + quote(
+ percent_encoding_test_chars, '') + '@' + url_address
+ (username, password) = get_auth_from_url(url)
+ assert username == percent_encoding_test_chars
+ assert password == percent_encoding_test_chars
+
+ def test_requote_uri_with_unquoted_percents(self):
+ """Ensure we handle unquoted percent signs in redirects.
+
+ See: https://github.com/kennethreitz/requests/issues/2356
+ """
+ from requests.utils import requote_uri
+ bad_uri = 'http://example.com/fiz?buz=%ppicture'
+ quoted = 'http://example.com/fiz?buz=%25ppicture'
+ assert quoted == requote_uri(bad_uri)
+
+ def test_requote_uri_properly_requotes(self):
+ """Ensure requoting doesn't break expectations."""
+ from requests.utils import requote_uri
+ quoted = 'http://example.com/fiz?buz=%25ppicture'
+ assert quoted == requote_uri(quoted)
+
+
+class TestMorselToCookieExpires(unittest.TestCase):
+
+ """Tests for morsel_to_cookie when morsel contains expires."""
+
+ def test_expires_valid_str(self):
+ """Test case where we convert expires from string time."""
+
+ morsel = Morsel()
+ morsel['expires'] = 'Thu, 01-Jan-1970 00:00:01 GMT'
+ cookie = morsel_to_cookie(morsel)
+ assert cookie.expires == 1
+
+ def test_expires_invalid_int(self):
+ """Test case where an invalid type is passed for expires."""
+
+ morsel = Morsel()
+ morsel['expires'] = 100
+ with pytest.raises(TypeError):
+ morsel_to_cookie(morsel)
+
+ def test_expires_invalid_str(self):
+ """Test case where an invalid string is input."""
+
+ morsel = Morsel()
+ morsel['expires'] = 'woops'
+ with pytest.raises(ValueError):
+ morsel_to_cookie(morsel)
+
+ def test_expires_none(self):
+ """Test case where expires is None."""
+
+ morsel = Morsel()
+ morsel['expires'] = None
+ cookie = morsel_to_cookie(morsel)
+ assert cookie.expires is None
+
+
+class TestMorselToCookieMaxAge(unittest.TestCase):
+
+ """Tests for morsel_to_cookie when morsel contains max-age."""
+
+ def test_max_age_valid_int(self):
+ """Test case where a valid max age in seconds is passed."""
+
+ morsel = Morsel()
+ morsel['max-age'] = 60
+ cookie = morsel_to_cookie(morsel)
+ assert isinstance(cookie.expires, int)
+
+ def test_max_age_invalid_str(self):
+ """Test case where a invalid max age is passed."""
+
+ morsel = Morsel()
+ morsel['max-age'] = 'woops'
+ with pytest.raises(TypeError):
+ morsel_to_cookie(morsel)
+
+
+class TestTimeout:
+ def test_stream_timeout(self, httpbin):
+ try:
+ requests.get(httpbin('delay/10'), timeout=2.0)
+ except requests.exceptions.Timeout as e:
+ assert 'Read timed out' in e.args[0].args[0]
+
+ def test_invalid_timeout(self, httpbin):
+ with pytest.raises(ValueError) as e:
+ requests.get(httpbin('get'), timeout=(3, 4, 5))
+ assert '(connect, read)' in str(e)
+
+ with pytest.raises(ValueError) as e:
+ requests.get(httpbin('get'), timeout="foo")
+ assert 'must be an int or float' in str(e)
+
+ def test_none_timeout(self, httpbin):
+ """ Check that you can set None as a valid timeout value.
+
+ To actually test this behavior, we'd want to check that setting the
+ timeout to None actually lets the request block past the system default
+ timeout. However, this would make the test suite unbearably slow.
+ Instead we verify that setting the timeout to None does not prevent the
+ request from succeeding.
+ """
+ r = requests.get(httpbin('get'), timeout=None)
+ assert r.status_code == 200
+
+ def test_read_timeout(self, httpbin):
+ try:
+ requests.get(httpbin('delay/10'), timeout=(None, 0.1))
+ assert False, "The recv() request should time out."
+ except ReadTimeout:
+ pass
+
+ def test_connect_timeout(self):
+ try:
+ requests.get(TARPIT, timeout=(0.1, None))
+ assert False, "The connect() request should time out."
+ except ConnectTimeout as e:
+ assert isinstance(e, ConnectionError)
+ assert isinstance(e, Timeout)
+
+ def test_total_timeout_connect(self):
+ try:
+ requests.get(TARPIT, timeout=(0.1, 0.1))
+ assert False, "The connect() request should time out."
+ except ConnectTimeout:
+ pass
+
+ def test_encoded_methods(self, httpbin):
+ """See: https://github.com/kennethreitz/requests/issues/2316"""
+ r = requests.request(b'GET', httpbin('get'))
+ assert r.ok
+
+
+SendCall = collections.namedtuple('SendCall', ('args', 'kwargs'))
+
+
+class RedirectSession(SessionRedirectMixin):
+ def __init__(self, order_of_redirects):
+ self.redirects = order_of_redirects
+ self.calls = []
+ self.max_redirects = 30
+ self.cookies = {}
+ self.trust_env = False
+
+ def send(self, *args, **kwargs):
+ self.calls.append(SendCall(args, kwargs))
+ return self.build_response()
+
+ def build_response(self):
+ request = self.calls[-1].args[0]
+ r = requests.Response()
+
+ try:
+ r.status_code = int(self.redirects.pop(0))
+ except IndexError:
+ r.status_code = 200
+
+ r.headers = CaseInsensitiveDict({'Location': '/'})
+ r.raw = self._build_raw()
+ r.request = request
+ return r
+
+ def _build_raw(self):
+ string = StringIO.StringIO('')
+ setattr(string, 'release_conn', lambda *args: args)
+ return string
+
+
+class TestRedirects:
+ default_keyword_args = {
+ 'stream': False,
+ 'verify': True,
+ 'cert': None,
+ 'timeout': None,
+ 'allow_redirects': False,
+ 'proxies': {},
+ }
+
+ def test_requests_are_updated_each_time(self, httpbin):
+ session = RedirectSession([303, 307])
+ prep = requests.Request('POST', httpbin('post')).prepare()
+ r0 = session.send(prep)
+ assert r0.request.method == 'POST'
+ assert session.calls[-1] == SendCall((r0.request,), {})
+ redirect_generator = session.resolve_redirects(r0, prep)
+ for response in redirect_generator:
+ assert response.request.method == 'GET'
+ send_call = SendCall((response.request,),
+ TestRedirects.default_keyword_args)
+ assert session.calls[-1] == send_call
+
+
+
+@pytest.fixture
+def list_of_tuples():
+ return [
+ (('a', 'b'), ('c', 'd')),
+ (('c', 'd'), ('a', 'b')),
+ (('a', 'b'), ('c', 'd'), ('e', 'f')),
+ ]
+
+
+def test_data_argument_accepts_tuples(list_of_tuples):
+ """
+ Ensure that the data argument will accept tuples of strings
+ and properly encode them.
+ """
+ for data in list_of_tuples:
+ p = PreparedRequest()
+ p.prepare(
+ method='GET',
+ url='http://www.example.com',
+ data=data,
+ hooks=default_hooks()
+ )
+ assert p.body == urlencode(data)
+
+
+def assert_copy(p, p_copy):
+ for attr in ('method', 'url', 'headers', '_cookies', 'body', 'hooks'):
+ assert getattr(p, attr) == getattr(p_copy, attr)
+
+
+def test_prepared_request_empty_copy():
+ p = PreparedRequest()
+ assert_copy(p, p.copy())
+
+
+def test_prepared_request_no_cookies_copy():
+ p = PreparedRequest()
+ p.prepare(
+ method='GET',
+ url='http://www.example.com',
+ data='foo=bar',
+ hooks=default_hooks()
+ )
+ assert_copy(p, p.copy())
+
+
+def test_prepared_request_complete_copy():
+ p = PreparedRequest()
+ p.prepare(
+ method='GET',
+ url='http://www.example.com',
+ data='foo=bar',
+ hooks=default_hooks(),
+ cookies={'foo': 'bar'}
+ )
+ assert_copy(p, p.copy())
+
+
+def test_prepare_unicode_url():
+ p = PreparedRequest()
+ p.prepare(
+ method='GET',
+ url=u('http://www.example.com/üniçø∂é'),
+ )
+ assert_copy(p, p.copy())
+
+
+def test_urllib3_retries(httpbin):
+ from requests.packages.urllib3.util import Retry
+ s = requests.Session()
+ s.mount('http://', HTTPAdapter(max_retries=Retry(
+ total=2, status_forcelist=[500]
+ )))
+
+ with pytest.raises(RetryError):
+ s.get(httpbin('status/500'))
+
+
+def test_urllib3_pool_connection_closed(httpbin):
+ s = requests.Session()
+ s.mount('http://', HTTPAdapter(pool_connections=0, pool_maxsize=0))
+
+ try:
+ s.get(httpbin('status/200'))
+ except ConnectionError as e:
+ assert u"Pool is closed." in str(e)
+
+
+def test_vendor_aliases():
+ from requests.packages import urllib3
+ from requests.packages import chardet
+
+ with pytest.raises(ImportError):
+ from requests.packages import webbrowser
+
+
+if __name__ == '__main__':
+ unittest.main()