230 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			230 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/env python
 | 
						|
 | 
						|
# test_quote.py - unit test for strings quoting
 | 
						|
#
 | 
						|
# Copyright (C) 2007-2019 Daniele Varrazzo  <daniele.varrazzo@gmail.com>
 | 
						|
# Copyright (C) 2020-2021 The Psycopg Team
 | 
						|
#
 | 
						|
# psycopg2 is free software: you can redistribute it and/or modify it
 | 
						|
# under the terms of the GNU Lesser General Public License as published
 | 
						|
# by the Free Software Foundation, either version 3 of the License, or
 | 
						|
# (at your option) any later version.
 | 
						|
#
 | 
						|
# In addition, as a special exception, the copyright holders give
 | 
						|
# permission to link this program with the OpenSSL library (or with
 | 
						|
# modified versions of OpenSSL that use the same license as OpenSSL),
 | 
						|
# and distribute linked combinations including the two.
 | 
						|
#
 | 
						|
# You must obey the GNU Lesser General Public License in all respects for
 | 
						|
# all of the code used other than OpenSSL.
 | 
						|
#
 | 
						|
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
 | 
						|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 | 
						|
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
 | 
						|
# License for more details.
 | 
						|
 | 
						|
from . import testutils
 | 
						|
import unittest
 | 
						|
from .testutils import ConnectingTestCase, skip_if_crdb
 | 
						|
 | 
						|
import psycopg2
 | 
						|
import psycopg2.extensions
 | 
						|
from psycopg2.extensions import adapt, quote_ident
 | 
						|
 | 
						|
 | 
						|
class QuotingTestCase(ConnectingTestCase):
 | 
						|
    r"""Checks the correct quoting of strings and binary objects.
 | 
						|
 | 
						|
    Since ver. 8.1, PostgreSQL is moving towards SQL standard conforming
 | 
						|
    strings, where the backslash (\) is treated as literal character,
 | 
						|
    not as escape. To treat the backslash as a C-style escapes, PG supports
 | 
						|
    the E'' quotes.
 | 
						|
 | 
						|
    This test case checks that the E'' quotes are used whenever they are
 | 
						|
    needed. The tests are expected to pass with all PostgreSQL server versions
 | 
						|
    (currently tested with 7.4 <= PG <= 8.3beta) and with any
 | 
						|
    'standard_conforming_strings' server parameter value.
 | 
						|
    The tests also check that no warning is raised ('escape_string_warning'
 | 
						|
    should be on).
 | 
						|
 | 
						|
    https://www.postgresql.org/docs/current/static/sql-syntax-lexical.html#SQL-SYNTAX-STRINGS
 | 
						|
    https://www.postgresql.org/docs/current/static/runtime-config-compatible.html
 | 
						|
    """
 | 
						|
    def test_string(self):
 | 
						|
        data = """some data with \t chars
 | 
						|
        to escape into, 'quotes' and \\ a backslash too.
 | 
						|
        """
 | 
						|
        data += "".join(map(chr, range(1, 127)))
 | 
						|
 | 
						|
        curs = self.conn.cursor()
 | 
						|
        curs.execute("SELECT %s;", (data,))
 | 
						|
        res = curs.fetchone()[0]
 | 
						|
 | 
						|
        self.assertEqual(res, data)
 | 
						|
        self.assert_(not self.conn.notices)
 | 
						|
 | 
						|
    def test_string_null_terminator(self):
 | 
						|
        curs = self.conn.cursor()
 | 
						|
        data = 'abcd\x01\x00cdefg'
 | 
						|
 | 
						|
        try:
 | 
						|
            curs.execute("SELECT %s", (data,))
 | 
						|
        except ValueError as e:
 | 
						|
            self.assertEquals(str(e),
 | 
						|
                'A string literal cannot contain NUL (0x00) characters.')
 | 
						|
        else:
 | 
						|
            self.fail("ValueError not raised")
 | 
						|
 | 
						|
    def test_binary(self):
 | 
						|
        data = b"""some data with \000\013 binary
 | 
						|
        stuff into, 'quotes' and \\ a backslash too.
 | 
						|
        """
 | 
						|
        data += bytes(list(range(256)))
 | 
						|
 | 
						|
        curs = self.conn.cursor()
 | 
						|
        curs.execute("SELECT %s::bytea;", (psycopg2.Binary(data),))
 | 
						|
        res = curs.fetchone()[0].tobytes()
 | 
						|
 | 
						|
        if res[0] in (b'x', ord(b'x')) and self.conn.info.server_version >= 90000:
 | 
						|
            return self.skipTest(
 | 
						|
                "bytea broken with server >= 9.0, libpq < 9")
 | 
						|
 | 
						|
        self.assertEqual(res, data)
 | 
						|
        self.assert_(not self.conn.notices)
 | 
						|
 | 
						|
    def test_unicode(self):
 | 
						|
        curs = self.conn.cursor()
 | 
						|
        curs.execute("SHOW server_encoding")
 | 
						|
        server_encoding = curs.fetchone()[0]
 | 
						|
        if server_encoding != "UTF8":
 | 
						|
            return self.skipTest(
 | 
						|
                f"Unicode test skipped since server encoding is {server_encoding}")
 | 
						|
 | 
						|
        data = """some data with \t chars
 | 
						|
        to escape into, 'quotes', \u20ac euro sign and \\ a backslash too.
 | 
						|
        """
 | 
						|
        data += "".join(map(chr, [u for u in range(1, 65536)
 | 
						|
            if not 0xD800 <= u <= 0xDFFF]))    # surrogate area
 | 
						|
        self.conn.set_client_encoding('UNICODE')
 | 
						|
 | 
						|
        psycopg2.extensions.register_type(psycopg2.extensions.UNICODE, self.conn)
 | 
						|
        curs.execute("SELECT %s::text;", (data,))
 | 
						|
        res = curs.fetchone()[0]
 | 
						|
 | 
						|
        self.assertEqual(res, data)
 | 
						|
        self.assert_(not self.conn.notices)
 | 
						|
 | 
						|
    @skip_if_crdb("encoding")
 | 
						|
    def test_latin1(self):
 | 
						|
        self.conn.set_client_encoding('LATIN1')
 | 
						|
        curs = self.conn.cursor()
 | 
						|
        data = bytes(list(range(32, 127))
 | 
						|
            + list(range(160, 256))).decode('latin1')
 | 
						|
 | 
						|
        # as string
 | 
						|
        curs.execute("SELECT %s::text;", (data,))
 | 
						|
        res = curs.fetchone()[0]
 | 
						|
        self.assertEqual(res, data)
 | 
						|
        self.assert_(not self.conn.notices)
 | 
						|
 | 
						|
 | 
						|
    @skip_if_crdb("encoding")
 | 
						|
    def test_koi8(self):
 | 
						|
        self.conn.set_client_encoding('KOI8')
 | 
						|
        curs = self.conn.cursor()
 | 
						|
        data = bytes(list(range(32, 127))
 | 
						|
            + list(range(128, 256))).decode('koi8_r')
 | 
						|
 | 
						|
        # as string
 | 
						|
        curs.execute("SELECT %s::text;", (data,))
 | 
						|
        res = curs.fetchone()[0]
 | 
						|
        self.assertEqual(res, data)
 | 
						|
        self.assert_(not self.conn.notices)
 | 
						|
 | 
						|
    def test_bytes(self):
 | 
						|
        snowman = "\u2603"
 | 
						|
        conn = self.connect()
 | 
						|
        conn.set_client_encoding('UNICODE')
 | 
						|
        psycopg2.extensions.register_type(psycopg2.extensions.BYTES, conn)
 | 
						|
        curs = conn.cursor()
 | 
						|
        curs.execute("select %s::text", (snowman,))
 | 
						|
        x = curs.fetchone()[0]
 | 
						|
        self.assert_(isinstance(x, bytes))
 | 
						|
        self.assertEqual(x, snowman.encode('utf8'))
 | 
						|
 | 
						|
 | 
						|
class TestQuotedString(ConnectingTestCase):
 | 
						|
    def test_encoding_from_conn(self):
 | 
						|
        q = psycopg2.extensions.QuotedString('hi')
 | 
						|
        self.assertEqual(q.encoding, 'latin1')
 | 
						|
 | 
						|
        self.conn.set_client_encoding('utf_8')
 | 
						|
        q.prepare(self.conn)
 | 
						|
        self.assertEqual(q.encoding, 'utf_8')
 | 
						|
 | 
						|
 | 
						|
class TestQuotedIdentifier(ConnectingTestCase):
 | 
						|
    def test_identifier(self):
 | 
						|
        self.assertEqual(quote_ident('blah-blah', self.conn), '"blah-blah"')
 | 
						|
        self.assertEqual(quote_ident('quote"inside', self.conn), '"quote""inside"')
 | 
						|
 | 
						|
    @testutils.skip_before_postgres(8, 0)
 | 
						|
    def test_unicode_ident(self):
 | 
						|
        snowman = "\u2603"
 | 
						|
        quoted = '"' + snowman + '"'
 | 
						|
        self.assertEqual(quote_ident(snowman, self.conn), quoted)
 | 
						|
 | 
						|
 | 
						|
class TestStringAdapter(ConnectingTestCase):
 | 
						|
    def test_encoding_default(self):
 | 
						|
        a = adapt("hello")
 | 
						|
        self.assertEqual(a.encoding, 'latin1')
 | 
						|
        self.assertEqual(a.getquoted(), b"'hello'")
 | 
						|
 | 
						|
        # NOTE: we can't really test an encoding different from utf8, because
 | 
						|
        # when encoding without connection the libpq will use parameters from
 | 
						|
        # a previous one, so what would happens depends jn the tests run order.
 | 
						|
        # egrave = u'\xe8'
 | 
						|
        # self.assertEqual(adapt(egrave).getquoted(), "'\xe8'")
 | 
						|
 | 
						|
    def test_encoding_error(self):
 | 
						|
        snowman = "\u2603"
 | 
						|
        a = adapt(snowman)
 | 
						|
        self.assertRaises(UnicodeEncodeError, a.getquoted)
 | 
						|
 | 
						|
    def test_set_encoding(self):
 | 
						|
        # Note: this works-ish mostly in case when the standard db connection
 | 
						|
        # we test with is utf8, otherwise the encoding chosen by PQescapeString
 | 
						|
        # may give bad results.
 | 
						|
        snowman = "\u2603"
 | 
						|
        a = adapt(snowman)
 | 
						|
        a.encoding = 'utf8'
 | 
						|
        self.assertEqual(a.encoding, 'utf8')
 | 
						|
        self.assertEqual(a.getquoted(), b"'\xe2\x98\x83'")
 | 
						|
 | 
						|
    def test_connection_wins_anyway(self):
 | 
						|
        snowman = "\u2603"
 | 
						|
        a = adapt(snowman)
 | 
						|
        a.encoding = 'latin9'
 | 
						|
 | 
						|
        self.conn.set_client_encoding('utf8')
 | 
						|
        a.prepare(self.conn)
 | 
						|
 | 
						|
        self.assertEqual(a.encoding, 'utf_8')
 | 
						|
        self.assertQuotedEqual(a.getquoted(), b"'\xe2\x98\x83'")
 | 
						|
 | 
						|
    def test_adapt_bytes(self):
 | 
						|
        snowman = "\u2603"
 | 
						|
        self.conn.set_client_encoding('utf8')
 | 
						|
        a = psycopg2.extensions.QuotedString(snowman.encode('utf8'))
 | 
						|
        a.prepare(self.conn)
 | 
						|
        self.assertQuotedEqual(a.getquoted(), b"'\xe2\x98\x83'")
 | 
						|
 | 
						|
 | 
						|
def test_suite():
 | 
						|
    return unittest.TestLoader().loadTestsFromName(__name__)
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    unittest.main()
 |