33
33
check_disallow_instantiation ,
34
34
threading_helper ,
35
35
)
36
+ from _testcapi import INT_MAX
37
+ from os import SEEK_SET , SEEK_CUR , SEEK_END
36
38
from test .support .os_helper import TESTFN , unlink , temp_dir
37
39
38
40
@@ -1041,11 +1043,163 @@ def test_same_query_in_multiple_cursors(self):
1041
1043
self .assertEqual (cu .fetchall (), [(1 ,)])
1042
1044
1043
1045
1046
+ class BlobTests (unittest .TestCase ):
1047
+ def setUp (self ):
1048
+ self .cx = sqlite .connect (":memory:" )
1049
+ self .cx .execute ("create table test(b blob)" )
1050
+ self .data = b"this blob data string is exactly fifty bytes long!"
1051
+ self .cx .execute ("insert into test(b) values (?)" , (self .data ,))
1052
+ self .blob = self .cx .blobopen ("test" , "b" , 1 )
1053
+
1054
+ def tearDown (self ):
1055
+ self .blob .close ()
1056
+ self .cx .close ()
1057
+
1058
+ def test_blob_seek_and_tell (self ):
1059
+ self .blob .seek (10 )
1060
+ self .assertEqual (self .blob .tell (), 10 )
1061
+
1062
+ self .blob .seek (10 , SEEK_SET )
1063
+ self .assertEqual (self .blob .tell (), 10 )
1064
+
1065
+ self .blob .seek (10 , SEEK_CUR )
1066
+ self .assertEqual (self .blob .tell (), 20 )
1067
+
1068
+ self .blob .seek (- 10 , SEEK_END )
1069
+ self .assertEqual (self .blob .tell (), 40 )
1070
+
1071
+ def test_blob_seek_error (self ):
1072
+ msg_oor = "offset out of blob range"
1073
+ msg_orig = "'origin' should be os.SEEK_SET, os.SEEK_CUR, or os.SEEK_END"
1074
+ msg_of = "seek offset results in overflow"
1075
+
1076
+ dataset = (
1077
+ (ValueError , msg_oor , lambda : self .blob .seek (1000 )),
1078
+ (ValueError , msg_oor , lambda : self .blob .seek (- 10 )),
1079
+ (ValueError , msg_orig , lambda : self .blob .seek (10 , - 1 )),
1080
+ (ValueError , msg_orig , lambda : self .blob .seek (10 , 3 )),
1081
+ )
1082
+ for exc , msg , fn in dataset :
1083
+ with self .subTest (exc = exc , msg = msg , fn = fn ):
1084
+ self .assertRaisesRegex (exc , msg , fn )
1085
+
1086
+ # Force overflow errors
1087
+ self .blob .seek (1 , SEEK_SET )
1088
+ with self .assertRaisesRegex (OverflowError , msg_of ):
1089
+ self .blob .seek (INT_MAX , SEEK_CUR )
1090
+ with self .assertRaisesRegex (OverflowError , msg_of ):
1091
+ self .blob .seek (INT_MAX , SEEK_END )
1092
+
1093
+ def test_blob_read (self ):
1094
+ buf = self .blob .read ()
1095
+ self .assertEqual (buf , self .data )
1096
+
1097
+ def test_blob_read_oversized (self ):
1098
+ buf = self .blob .read (len (self .data ) * 2 )
1099
+ self .assertEqual (buf , self .data )
1100
+
1101
+ def test_blob_read_advance_offset (self ):
1102
+ n = 10
1103
+ buf = self .blob .read (n )
1104
+ self .assertEqual (buf , self .data [:n ])
1105
+ self .assertEqual (self .blob .tell (), n )
1106
+
1107
+ def test_blob_read_at_offset (self ):
1108
+ self .blob .seek (10 )
1109
+ self .assertEqual (self .blob .read (10 ), self .data [10 :20 ])
1110
+
1111
+ def test_blob_read_error_row_changed (self ):
1112
+ self .cx .execute ("update test set b='aaaa' where rowid=1" )
1113
+ with self .assertRaises (sqlite .OperationalError ):
1114
+ self .blob .read ()
1115
+
1116
+ def test_blob_write (self ):
1117
+ new_data = b"new data" .ljust (50 )
1118
+ self .blob .write (new_data )
1119
+ row = self .cx .execute ("select b from test" ).fetchone ()
1120
+ self .assertEqual (row [0 ], new_data )
1121
+
1122
+ def test_blob_write_at_offset (self ):
1123
+ new_data = b"c" * 25
1124
+ self .blob .seek (25 )
1125
+ self .blob .write (new_data )
1126
+ row = self .cx .execute ("select b from test" ).fetchone ()
1127
+ self .assertEqual (row [0 ], self .data [:25 ] + new_data )
1128
+
1129
+ def test_blob_write_advance_offset (self ):
1130
+ self .blob .write (b"d" * 10 )
1131
+ self .assertEqual (self .blob .tell (), 10 )
1132
+
1133
+ def test_blob_write_error_length (self ):
1134
+ with self .assertRaisesRegex (ValueError , "data longer than blob" ):
1135
+ self .blob .write (b"a" * 1000 )
1136
+
1137
+ def test_blob_write_error_row_changed (self ):
1138
+ self .cx .execute ("update test set b='aaaa' where rowid=1" )
1139
+ with self .assertRaises (sqlite .OperationalError ):
1140
+ self .blob .write (b"aaa" )
1141
+
1142
+ def test_blob_write_error_readonly (self ):
1143
+ ro_blob = self .cx .blobopen ("test" , "b" , 1 , readonly = True )
1144
+ with self .assertRaisesRegex (sqlite .OperationalError , "readonly" ):
1145
+ ro_blob .write (b"aaa" )
1146
+ ro_blob .close ()
1147
+
1148
+ def test_blob_open_error (self ):
1149
+ dataset = (
1150
+ (("test" , "b" , 1 ), {"name" : "notexisting" }),
1151
+ (("notexisting" , "b" , 1 ), {}),
1152
+ (("test" , "notexisting" , 1 ), {}),
1153
+ (("test" , "b" , 2 ), {}),
1154
+ )
1155
+ regex = "no such"
1156
+ for args , kwds in dataset :
1157
+ with self .subTest (args = args , kwds = kwds ):
1158
+ with self .assertRaisesRegex (sqlite .OperationalError , regex ):
1159
+ self .cx .blobopen (* args , ** kwds )
1160
+
1161
+ def test_blob_sequence_not_supported (self ):
1162
+ with self .assertRaises (TypeError ):
1163
+ self .blob + self .blob
1164
+ with self .assertRaises (TypeError ):
1165
+ self .blob * 5
1166
+ with self .assertRaises (TypeError ):
1167
+ b"a" in self .blob
1168
+
1169
+ def test_blob_closed (self ):
1170
+ with memory_database () as cx :
1171
+ cx .execute ("create table test(b blob)" )
1172
+ cx .execute ("insert into test values (zeroblob(100))" )
1173
+ blob = cx .blobopen ("test" , "b" , 1 )
1174
+ blob .close ()
1175
+
1176
+ msg = "Cannot operate on a closed blob"
1177
+ with self .assertRaisesRegex (sqlite .ProgrammingError , msg ):
1178
+ blob .read ()
1179
+ with self .assertRaisesRegex (sqlite .ProgrammingError , msg ):
1180
+ blob .write (b"" )
1181
+ with self .assertRaisesRegex (sqlite .ProgrammingError , msg ):
1182
+ blob .seek (0 )
1183
+ with self .assertRaisesRegex (sqlite .ProgrammingError , msg ):
1184
+ blob .tell ()
1185
+
1186
+ def test_blob_closed_db_read (self ):
1187
+ with memory_database () as cx :
1188
+ cx .execute ("create table test(b blob)" )
1189
+ cx .execute ("insert into test(b) values (zeroblob(100))" )
1190
+ blob = cx .blobopen ("test" , "b" , 1 )
1191
+ cx .close ()
1192
+ self .assertRaisesRegex (sqlite .ProgrammingError ,
1193
+ "Cannot operate on a closed database" ,
1194
+ blob .read )
1195
+
1196
+
1044
1197
class ThreadTests (unittest .TestCase ):
1045
1198
def setUp (self ):
1046
1199
self .con = sqlite .connect (":memory:" )
1047
1200
self .cur = self .con .cursor ()
1048
- self .cur .execute ("create table test(name text)" )
1201
+ self .cur .execute ("create table test(name text, b blob)" )
1202
+ self .cur .execute ("insert into test values('blob', zeroblob(1))" )
1049
1203
1050
1204
def tearDown (self ):
1051
1205
self .cur .close ()
@@ -1080,6 +1234,7 @@ def test_check_connection_thread(self):
1080
1234
lambda : self .con .create_collation ("foo" , None ),
1081
1235
lambda : self .con .setlimit (sqlite .SQLITE_LIMIT_LENGTH , - 1 ),
1082
1236
lambda : self .con .getlimit (sqlite .SQLITE_LIMIT_LENGTH ),
1237
+ lambda : self .con .blobopen ("test" , "b" , 1 ),
1083
1238
]
1084
1239
if hasattr (sqlite .Connection , "serialize" ):
1085
1240
fns .append (lambda : self .con .serialize ())
0 commit comments