bufaccess.pyx 28.1 KB
Newer Older
1 2 3 4 5 6 7 8 9
# Tests the buffer access syntax functionality by constructing
# mock buffer objects.
#
# Note that the buffers are mock objects created for testing
# the buffer access behaviour -- for instance there is no flag
# checking in the buffer objects (why test our test case?), rather
# what we want to test is what is passed into the flags argument.
#

10
from __future__ import unicode_literals
11

12 13 14 15
from cpython.object cimport PyObject
from cpython.ref cimport Py_INCREF, Py_DECREF
cimport cython

16
__test__ = {}
17

Stefan Behnel's avatar
Stefan Behnel committed
18
import sys
19
#import re
20
exclude = []#re.compile('object').search]
21

22 23 24 25 26
if getattr(sys, 'pypy_version_info', None) is not None:
    # disable object-in-buffer tests in PyPy
    import re
    exclude.append(re.compile('object').search)

Dag Sverre Seljebotn's avatar
Dag Sverre Seljebotn committed
27
def testcase(func):
28 29 30
    for e in exclude:
        if e(func.__name__):
            return func
Stefan Behnel's avatar
Stefan Behnel committed
31
    __test__[func.__name__] = func.__doc__
32 33
    return func

34

35
include "mockbuffers.pxi"
36 37 38 39 40

#
# Buffer acquire and release tests
#

41 42
def nousage():
    """
43 44
    The challenge here is just compilation.
    """
45
    cdef object[int, ndim=2] buf
46

47

48
@testcase
49 50 51 52 53 54 55 56 57 58 59 60
def disabled_usage(obj):
    """
    The challenge here is just compilation.

    >>> disabled_usage(None)
    """
    cdef object[int, ndim=2] buf
    if False:
        buf = obj
    return obj


61 62 63 64 65 66 67 68 69 70 71 72 73
@testcase
def nousage_cleanup(x):
    """
    >>> nousage_cleanup(False)
    >>> nousage_cleanup(True)
    Traceback (most recent call last):
    RuntimeError
    """
    cdef object[int, ndim=2] buf
    if x:
        raise RuntimeError()


Dag Sverre Seljebotn's avatar
Dag Sverre Seljebotn committed
74
@testcase
75 76
def acquire_release(o1, o2):
    """
Dag Sverre Seljebotn's avatar
Dag Sverre Seljebotn committed
77 78
    >>> A = IntMockBuffer("A", range(6))
    >>> B = IntMockBuffer("B", range(6))
79 80 81 82 83
    >>> acquire_release(A, B)
    acquired A
    released A
    acquired B
    released B
84 85 86 87
    >>> acquire_release(None, None)
    >>> acquire_release(None, B)
    acquired B
    released B
88 89 90 91
    """
    cdef object[int] buf
    buf = o1
    buf = o2
Dag Sverre Seljebotn's avatar
Dag Sverre Seljebotn committed
92

93
@testcase
94 95 96 97
def acquire_raise(o):
    """
    Apparently, doctest won't handle mixed exceptions and print
    stats, so need to circumvent this.
98

Dag Sverre Seljebotn's avatar
Dag Sverre Seljebotn committed
99
    >>> A = IntMockBuffer("A", range(6))
100 101
    >>> A.resetlog()
    >>> acquire_raise(A)
102 103 104
    Traceback (most recent call last):
        ...
    Exception: on purpose
105
    >>> A.printlog()
Dag Sverre Seljebotn's avatar
Dag Sverre Seljebotn committed
106 107
    acquired A
    released A
108

109 110 111 112 113
    """
    cdef object[int] buf
    buf = o
    raise Exception("on purpose")

114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
@testcase
def acquire_failure1():
    """
    >>> acquire_failure1()
    acquired working
    0 3
    0 3
    released working
    """
    cdef object[int] buf
    buf = IntMockBuffer("working", range(4))
    print buf[0], buf[3]
    try:
        buf = ErrorBuffer()
        assert False
    except Exception:
        print buf[0], buf[3]

@testcase
def acquire_failure2():
    """
    >>> acquire_failure2()
    acquired working
    0 3
    0 3
    released working
    """
    cdef object[int] buf = IntMockBuffer("working", range(4))
    print buf[0], buf[3]
    try:
        buf = ErrorBuffer()
        assert False
    except Exception:
        print buf[0], buf[3]

@testcase
def acquire_failure3():
    """
    >>> acquire_failure3()
    acquired working
    0 3
    released working
    acquired working
    0 3
    released working
    """
    cdef object[int] buf
    buf = IntMockBuffer("working", range(4))
    print buf[0], buf[3]
    try:
        buf = 3
        assert False
    except Exception:
        print buf[0], buf[3]

@testcase
def acquire_failure4():
    """
    >>> acquire_failure4()
    acquired working
    0 3
    released working
    acquired working
    0 3
    released working
    """
    cdef object[int] buf = IntMockBuffer("working", range(4))
    print buf[0], buf[3]
    try:
        buf = 2
        assert False
    except Exception:
        print buf[0], buf[3]

@testcase
def acquire_failure5():
    """
    >>> acquire_failure5()
    Traceback (most recent call last):
       ...
    ValueError: Buffer acquisition failed on assignment; and then reacquiring the old buffer failed too!
    """
    cdef object[int] buf
    buf = IntMockBuffer("working", range(4))
    buf.fail = True
    buf = 3


@testcase
def acquire_nonbuffer1(first, second=None):
    """
Stefan Behnel's avatar
Stefan Behnel committed
205
    >>> acquire_nonbuffer1(3)   # doctest: +ELLIPSIS
206
    Traceback (most recent call last):
207
    TypeError:... 'int'...
Stefan Behnel's avatar
Stefan Behnel committed
208
    >>> acquire_nonbuffer1(type)   # doctest: +ELLIPSIS
209
    Traceback (most recent call last):
210
    TypeError:... 'type'...
Stefan Behnel's avatar
Stefan Behnel committed
211
    >>> acquire_nonbuffer1(None, 2)   # doctest: +ELLIPSIS
212
    Traceback (most recent call last):
213
    TypeError:... 'int'...
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
    """
    cdef object[int] buf
    buf = first
    buf = second

@testcase
def acquire_nonbuffer2():
    """
    >>> acquire_nonbuffer2()
    acquired working
    0 3
    released working
    acquired working
    0 3
    released working
    """
    cdef object[int] buf = IntMockBuffer("working", range(4))
    print buf[0], buf[3]
    try:
        buf = ErrorBuffer
        assert False
    except Exception:
        print buf[0], buf[3]


239 240 241
@testcase
def as_argument(object[int] bufarg, int n):
    """
Dag Sverre Seljebotn's avatar
Dag Sverre Seljebotn committed
242
    >>> A = IntMockBuffer("A", range(6))
243 244
    >>> as_argument(A, 6)
    acquired A
245
    0 1 2 3 4 5 END
246
    released A
247 248 249 250
    """
    cdef int i
    for i in range(n):
        print bufarg[i],
251
    print 'END'
252

253 254 255 256 257 258 259 260 261 262 263 264 265 266
@testcase
def as_argument_not_none(object[int] bufarg not None):
    """
    >>> A = IntMockBuffer("A", range(6))
    >>> as_argument_not_none(A)
    acquired A
    ACCEPTED
    released A
    >>> as_argument_not_none(None)
    Traceback (most recent call last):
    TypeError: Argument 'bufarg' must not be None
    """
    print 'ACCEPTED'

267 268 269
@testcase
def as_argument_defval(object[int] bufarg=IntMockBuffer('default', range(6)), int n=6):
    """
270
    >>> as_argument_defval()
271
    acquired default
272
    0 1 2 3 4 5 END
273
    released default
Dag Sverre Seljebotn's avatar
Dag Sverre Seljebotn committed
274
    >>> A = IntMockBuffer("A", range(6))
275 276
    >>> as_argument_defval(A, 6)
    acquired A
277
    0 1 2 3 4 5 END
278
    released A
279
    """
280
    cdef int i
281 282
    for i in range(n):
        print bufarg[i],
283
    print 'END'
284

285 286 287
@testcase
def cdef_assignment(obj, n):
    """
Dag Sverre Seljebotn's avatar
Dag Sverre Seljebotn committed
288
    >>> A = IntMockBuffer("A", range(6))
289 290
    >>> cdef_assignment(A, 6)
    acquired A
291
    0 1 2 3 4 5 END
292
    released A
293

294 295 296 297 298
    """
    cdef object[int] buf = obj
    cdef int i
    for i in range(n):
        print buf[i],
299
    print 'END'
300

301 302 303
@testcase
def forin_assignment(objs, int pick):
    """
Dag Sverre Seljebotn's avatar
Dag Sverre Seljebotn committed
304 305
    >>> A = IntMockBuffer("A", range(6))
    >>> B = IntMockBuffer("B", range(6))
306 307 308 309 310 311 312
    >>> forin_assignment([A, B, A, A], 2)
    acquired A
    2
    released A
    acquired B
    2
    released B
313
    acquired A
314 315 316 317
    2
    released A
    acquired A
    2
318
    released A
319 320 321 322 323 324 325 326
    """
    cdef object[int] buf
    for buf in objs:
        print buf[pick]

@testcase
def cascaded_buffer_assignment(obj):
    """
Dag Sverre Seljebotn's avatar
Dag Sverre Seljebotn committed
327
    >>> A = IntMockBuffer("A", range(6))
328 329 330 331 332 333 334 335 336 337 338 339
    >>> cascaded_buffer_assignment(A)
    acquired A
    acquired A
    released A
    released A
    """
    cdef object[int] a, b
    a = b = obj

@testcase
def tuple_buffer_assignment1(a, b):
    """
Dag Sverre Seljebotn's avatar
Dag Sverre Seljebotn committed
340 341
    >>> A = IntMockBuffer("A", range(6))
    >>> B = IntMockBuffer("B", range(6))
342 343
    >>> tuple_buffer_assignment1(A, B)
    acquired A
344
    acquired B
345
    released A
346
    released B
347 348 349
    """
    cdef object[int] x, y
    x, y = a, b
350

351 352 353
@testcase
def tuple_buffer_assignment2(tup):
    """
Dag Sverre Seljebotn's avatar
Dag Sverre Seljebotn committed
354 355
    >>> A = IntMockBuffer("A", range(6))
    >>> B = IntMockBuffer("B", range(6))
356 357 358 359 360 361 362 363
    >>> tuple_buffer_assignment2((A, B))
    acquired A
    acquired B
    released A
    released B
    """
    cdef object[int] x, y
    x, y = tup
364 365 366 367 368 369 370 371 372 373 374 375 376 377

@testcase
def explicitly_release_buffer():
    """
    >>> explicitly_release_buffer()
    acquired A
    released A
    After release
    """
    cdef object[int] x = IntMockBuffer("A", range(10))
    x = None
    print "After release"

#
378
# Getting items and index bounds checking
379
#
380
@testcase
381
def get_int_2d(object[int, ndim=2] buf, int i, int j):
382
    """
Dag Sverre Seljebotn's avatar
Dag Sverre Seljebotn committed
383
    >>> C = IntMockBuffer("C", range(6), (2,3))
384
    >>> get_int_2d(C, 1, 1)
385 386
    acquired C
    released C
387
    4
388 389

    Check negative indexing:
390
    >>> get_int_2d(C, -1, 0)
391 392
    acquired C
    released C
393 394
    3
    >>> get_int_2d(C, -1, -2)
395 396
    acquired C
    released C
397 398
    4
    >>> get_int_2d(C, -2, -3)
399 400
    acquired C
    released C
401 402
    0

403
    Out-of-bounds errors:
404 405 406
    >>> get_int_2d(C, 2, 0)
    Traceback (most recent call last):
        ...
407 408 409 410 411
    IndexError: Out of bounds on buffer access (axis 0)
    >>> get_int_2d(C, 0, -4)
    Traceback (most recent call last):
        ...
    IndexError: Out of bounds on buffer access (axis 1)
412 413
    """
    return buf[i, j]
414

415
@testcase
416
def get_int_2d_uintindex(object[int, ndim=2] buf, unsigned int i, unsigned int j):
417 418
    """
    Unsigned indexing:
Dag Sverre Seljebotn's avatar
Dag Sverre Seljebotn committed
419
    >>> C = IntMockBuffer("C", range(6), (2,3))
420 421 422 423 424 425 426 427
    >>> get_int_2d_uintindex(C, 0, 0)
    acquired C
    released C
    0
    >>> get_int_2d_uintindex(C, 1, 2)
    acquired C
    released C
    5
428 429 430 431
    """
    # This is most interesting with regards to the C code
    # generated.
    return buf[i, j]
Dag Sverre Seljebotn's avatar
Dag Sverre Seljebotn committed
432

433
@testcase
434
def set_int_2d(object[int, ndim=2] buf, int i, int j, int value):
435 436 437
    """
    Uses get_int_2d to read back the value afterwards. For pure
    unit test, one should support reading in MockBuffer instead.
438

Dag Sverre Seljebotn's avatar
Dag Sverre Seljebotn committed
439
    >>> C = IntMockBuffer("C", range(6), (2,3))
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463
    >>> set_int_2d(C, 1, 1, 10)
    acquired C
    released C
    >>> get_int_2d(C, 1, 1)
    acquired C
    released C
    10

    Check negative indexing:
    >>> set_int_2d(C, -1, 0, 3)
    acquired C
    released C
    >>> get_int_2d(C, -1, 0)
    acquired C
    released C
    3

    >>> set_int_2d(C, -1, -2, 8)
    acquired C
    released C
    >>> get_int_2d(C, -1, -2)
    acquired C
    released C
    8
464

465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481
    >>> set_int_2d(C, -2, -3, 9)
    acquired C
    released C
    >>> get_int_2d(C, -2, -3)
    acquired C
    released C
    9

    Out-of-bounds errors:
    >>> set_int_2d(C, 2, 0, 19)
    Traceback (most recent call last):
        ...
    IndexError: Out of bounds on buffer access (axis 0)
    >>> set_int_2d(C, 0, -4, 19)
    Traceback (most recent call last):
        ...
    IndexError: Out of bounds on buffer access (axis 1)
482

483 484
    """
    buf[i, j] = value
485

486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
@testcase
def set_int_2d_cascaded(object[int, ndim=2] buf, int i, int j, int value):
    """
    Uses get_int_2d to read back the value afterwards. For pure
    unit test, one should support reading in MockBuffer instead.

    >>> C = IntMockBuffer("C", range(6), (2,3))
    >>> set_int_2d_cascaded(C, 1, 1, 10)
    acquired C
    released C
    10
    >>> get_int_2d(C, 1, 1)
    acquired C
    released C
    10

    Check negative indexing:
    >>> set_int_2d_cascaded(C, -1, 0, 3)
    acquired C
    released C
    3
    >>> get_int_2d(C, -1, 0)
    acquired C
    released C
    3

    >>> set_int_2d_cascaded(C, -1, -2, 8)
    acquired C
    released C
    8
    >>> get_int_2d(C, -1, -2)
    acquired C
    released C
    8

    >>> set_int_2d_cascaded(C, -2, -3, 9)
    acquired C
    released C
    9
    >>> get_int_2d(C, -2, -3)
    acquired C
    released C
    9

    Out-of-bounds errors:
    >>> set_int_2d_cascaded(C, 2, 0, 19)
    Traceback (most recent call last):
    IndexError: Out of bounds on buffer access (axis 0)
    >>> set_int_2d_cascaded(C, 0, -4, 19)
    Traceback (most recent call last):
    IndexError: Out of bounds on buffer access (axis 1)

    """
    cdef int casc_value
    buf[i, j] = casc_value = value
    return casc_value

543 544 545 546 547 548 549
@testcase
def list_comprehension(object[int] buf, len):
    """
    >>> list_comprehension(IntMockBuffer(None, [1,2,3]), 3)
    1|2|3
    """
    cdef int i
550
    print u"|".join([unicode(buf[i]) for i in range(len)])
551

552 553 554 555 556 557 558 559
#
# The negative_indices buffer option
#
@testcase
def no_negative_indices(object[int, negative_indices=False] buf, int idx):
    """
    The most interesting thing here is to inspect the C source and
    make sure optimal code is produced.
560

561 562 563 564 565 566 567 568 569 570
    >>> A = IntMockBuffer(None, range(6))
    >>> no_negative_indices(A, 3)
    3
    >>> no_negative_indices(A, -1)
    Traceback (most recent call last):
        ...
    IndexError: Out of bounds on buffer access (axis 0)
    """
    return buf[idx]

571
@testcase
572 573
@cython.wraparound(False)
def wraparound_directive(object[int] buf, int pos_idx, int neg_idx):
574
    """
575
    Again, the most interesting thing here is to inspect the C source.
576

577 578 579 580
    >>> A = IntMockBuffer(None, range(4))
    >>> wraparound_directive(A, 2, -1)
    5
    >>> wraparound_directive(A, -1, 2)
581 582
    Traceback (most recent call last):
        ...
583
    IndexError: Out of bounds on buffer access (axis 0)
584
    """
585 586 587 588
    cdef int byneg
    with cython.wraparound(True):
        byneg = buf[neg_idx]
    return buf[pos_idx] + byneg
589 590


591 592 593 594 595 596 597 598 599 600 601
#
# Test which flags are passed.
#
@testcase
def readonly(obj):
    """
    >>> R = UnsignedShortMockBuffer("R", range(27), shape=(3, 3, 3))
    >>> readonly(R)
    acquired R
    25
    released R
602
    >>> [str(x) for x in R.recieved_flags]  # Works in both py2 and py3
603 604
    ['FORMAT', 'INDIRECT', 'ND', 'STRIDES']
    """
605
    cdef object[unsigned short int, ndim=3] buf = obj
606 607 608 609 610 611 612 613 614
    print buf[2, 2, 1]

@testcase
def writable(obj):
    """
    >>> R = UnsignedShortMockBuffer("R", range(27), shape=(3, 3, 3))
    >>> writable(R)
    acquired R
    released R
615
    >>> [str(x) for x in R.recieved_flags] # Py2/3
616 617
    ['FORMAT', 'INDIRECT', 'ND', 'STRIDES', 'WRITABLE']
    """
618
    cdef object[unsigned short int, ndim=3] buf = obj
619 620
    buf[2, 2, 1] = 23

621
@testcase
622
def strided(object[int, ndim=1, mode='strided'] buf):
623 624 625 626 627 628
    """
    >>> A = IntMockBuffer("A", range(4))
    >>> strided(A)
    acquired A
    released A
    2
629
    >>> [str(x) for x in A.recieved_flags] # Py2/3
630
    ['FORMAT', 'ND', 'STRIDES']
631 632 633 634

    Check that the suboffsets were patched back prior to release.
    >>> A.release_ok
    True
635 636 637
    """
    return buf[2]

638 639 640 641 642 643 644 645 646 647
@testcase
def c_contig(object[int, ndim=1, mode='c'] buf):
    """
    >>> A = IntMockBuffer(None, range(4))
    >>> c_contig(A)
    2
    >>> [str(x) for x in A.recieved_flags]
    ['FORMAT', 'ND', 'STRIDES', 'C_CONTIGUOUS']
    """
    return buf[2]
648

649 650 651 652
@testcase
def c_contig_2d(object[int, ndim=2, mode='c'] buf):
    """
    Multi-dim has seperate implementation
653

654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676
    >>> A = IntMockBuffer(None, range(12), shape=(3,4))
    >>> c_contig_2d(A)
    7
    >>> [str(x) for x in A.recieved_flags]
    ['FORMAT', 'ND', 'STRIDES', 'C_CONTIGUOUS']
    """
    return buf[1, 3]

@testcase
def f_contig(object[int, ndim=1, mode='fortran'] buf):
    """
    >>> A = IntMockBuffer(None, range(4))
    >>> f_contig(A)
    2
    >>> [str(x) for x in A.recieved_flags]
    ['FORMAT', 'ND', 'STRIDES', 'F_CONTIGUOUS']
    """
    return buf[2]

@testcase
def f_contig_2d(object[int, ndim=2, mode='fortran'] buf):
    """
    Must set up strides manually to ensure Fortran ordering.
677

678 679 680 681 682 683 684 685
    >>> A = IntMockBuffer(None, range(12), shape=(4,3), strides=(1, 4))
    >>> f_contig_2d(A)
    7
    >>> [str(x) for x in A.recieved_flags]
    ['FORMAT', 'ND', 'STRIDES', 'F_CONTIGUOUS']
    """
    return buf[3, 1]

686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706
#
# Test compiler options for bounds checking. We create an array with a
# safe "boundary" (memory
# allocated outside of what it published) and then check whether we get back
# what we stored in the memory or an error.

@testcase
def safe_get(object[int] buf, int idx):
    """
    >>> A = IntMockBuffer(None, range(10), shape=(3,), offset=5)

    Validate our testing buffer...
    >>> safe_get(A, 0)
    5
    >>> safe_get(A, 2)
    7
    >>> safe_get(A, -3)
    5

    Access outside it. This is already done above for bounds check
    testing but we include it to tell the story right.
707

708 709 710 711 712 713 714 715 716 717 718 719
    >>> safe_get(A, -4)
    Traceback (most recent call last):
        ...
    IndexError: Out of bounds on buffer access (axis 0)
    >>> safe_get(A, 3)
    Traceback (most recent call last):
        ...
    IndexError: Out of bounds on buffer access (axis 0)
    """
    return buf[idx]

@testcase
720
@cython.boundscheck(False) # outer decorators should take precedence
721
@cython.boundscheck(True)
722 723 724 725 726 727 728 729 730 731 732 733 734
def unsafe_get(object[int] buf, int idx):
    """
    Access outside of the area the buffer publishes.
    >>> A = IntMockBuffer(None, range(10), shape=(3,), offset=5)
    >>> unsafe_get(A, -4)
    4
    >>> unsafe_get(A, -5)
    3
    >>> unsafe_get(A, 3)
    8
    """
    return buf[idx]

735 736 737 738 739
@testcase
@cython.boundscheck(False)
def unsafe_get_nonegative(object[int, negative_indices=False] buf, int idx):
    """
    Also inspect the C source to see that it is optimal...
740

741 742 743 744 745 746
    >>> A = IntMockBuffer(None, range(10), shape=(3,), offset=5)
    >>> unsafe_get_nonegative(A, -2)
    3
    """
    return buf[idx]

747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762
@testcase
def mixed_get(object[int] buf, int unsafe_idx, int safe_idx):
    """
    >>> A = IntMockBuffer(None, range(10), shape=(3,), offset=5)
    >>> mixed_get(A, -4, 0)
    (4, 5)
    >>> mixed_get(A, 0, -4)
    Traceback (most recent call last):
        ...
    IndexError: Out of bounds on buffer access (axis 0)
    """
    with cython.boundscheck(False):
        one = buf[unsafe_idx]
    with cython.boundscheck(True):
        two = buf[safe_idx]
    return (one, two)
763

764 765 766
#
# Coercions
#
767 768 769 770 771 772 773 774 775 776 777 778 779 780
## @testcase
## def coercions(object[unsigned char] uc):
##     """
## TODO
##     """
##     print type(uc[0])
##     uc[0] = -1
##     print uc[0]
##     uc[0] = <int>3.14
##     print uc[0]

##     cdef char* ch = b"asfd"
##     cdef object[object] objbuf
##     objbuf[3] = ch
781

782 783 784 785 786 787

#
# Testing that accessing data using various types of buffer access
# all works.
#

788 789 790 791 792 793 794
def printbuf_int(object[int] buf, shape):
    # Utility func
    cdef int i
    for i in range(shape[0]):
        print buf[i],
    print 'END'

795 796 797 798 799

@testcase
def printbuf_int_2d(o, shape):
    """
    Strided:
800

801 802
    >>> printbuf_int_2d(IntMockBuffer("A", range(6), (2,3)), (2,3))
    acquired A
803 804
    0 1 2 END
    3 4 5 END
805 806 807
    released A
    >>> printbuf_int_2d(IntMockBuffer("A", range(100), (3,3), strides=(20,5)), (3,3))
    acquired A
808 809 810
    0 5 10 END
    20 25 30 END
    40 45 50 END
811 812 813 814 815
    released A

    Indirect:
    >>> printbuf_int_2d(IntMockBuffer("A", [[1,2],[3,4]]), (2,2))
    acquired A
816 817
    1 2 END
    3 4 END
818 819 820
    released A
    """
    # should make shape builtin
821
    cdef object[int, ndim=2] buf
822 823 824 825 826
    buf = o
    cdef int i, j
    for i in range(shape[0]):
        for j in range(shape[1]):
            print buf[i, j],
827
        print 'END'
828

829
@testcase
830
def printbuf_float(o, shape):
831 832 833
    """
    >>> printbuf_float(FloatMockBuffer("F", [1.0, 1.25, 0.75, 1.0]), (4,))
    acquired F
834
    1.0 1.25 0.75 1.0 END
835 836 837
    released F
    """

838
    # should make shape builtin
839 840 841 842 843
    cdef object[float] buf
    buf = o
    cdef int i, j
    for i in range(shape[0]):
        print buf[i],
844
    print "END"
845 846


847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864
#
# Test assignments
#
@testcase
def inplace_operators(object[int] buf):
    """
    >>> buf = IntMockBuffer(None, [2, 2])
    >>> inplace_operators(buf)
    >>> printbuf_int(buf, (2,))
    0 3 END
    """
    cdef int j = 0
    buf[1] += 1
    buf[j] *= 2
    buf[0] -= 4



865 866 867
#
# Typedefs
#
868 869 870 871
# Test three layers of typedefs going through a h file for plain int, and
# simply a header file typedef for floats and unsigned.

ctypedef int td_cy_int
872
cdef extern from "bufaccess.h":
873 874 875 876
    ctypedef td_cy_int td_h_short # Defined as short, but Cython doesn't know this!
    ctypedef float td_h_double # Defined as double
    ctypedef unsigned int td_h_ushort # Defined as unsigned short
ctypedef td_h_short td_h_cy_short
877 878

@testcase
879
def printbuf_td_cy_int(object[td_cy_int] buf, shape):
880
    """
881
    >>> printbuf_td_cy_int(IntMockBuffer(None, range(3)), (3,))
882
    0 1 2 END
883
    >>> printbuf_td_cy_int(ShortMockBuffer(None, range(3)), (3,))
884 885
    Traceback (most recent call last):
       ...
886
    ValueError: Buffer dtype mismatch, expected 'td_cy_int' but got 'short'
887 888 889 890
    """
    cdef int i
    for i in range(shape[0]):
        print buf[i],
891
    print 'END'
892 893

@testcase
894
def printbuf_td_h_short(object[td_h_short] buf, shape):
895
    """
896
    >>> printbuf_td_h_short(ShortMockBuffer(None, range(3)), (3,))
897
    0 1 2 END
898
    >>> printbuf_td_h_short(IntMockBuffer(None, range(3)), (3,))
899 900
    Traceback (most recent call last):
       ...
901
    ValueError: Buffer dtype mismatch, expected 'td_h_short' but got 'int'
902
    """
903 904 905
    cdef int i
    for i in range(shape[0]):
        print buf[i],
906
    print 'END'
907 908

@testcase
909
def printbuf_td_h_cy_short(object[td_h_cy_short] buf, shape):
910
    """
911
    >>> printbuf_td_h_cy_short(ShortMockBuffer(None, range(3)), (3,))
912
    0 1 2 END
913
    >>> printbuf_td_h_cy_short(IntMockBuffer(None, range(3)), (3,))
914 915
    Traceback (most recent call last):
       ...
916
    ValueError: Buffer dtype mismatch, expected 'td_h_cy_short' but got 'int'
917 918 919 920
    """
    cdef int i
    for i in range(shape[0]):
        print buf[i],
921
    print 'END'
922

923 924 925 926 927 928 929 930
@testcase
def printbuf_td_h_ushort(object[td_h_ushort] buf, shape):
    """
    >>> printbuf_td_h_ushort(UnsignedShortMockBuffer(None, range(3)), (3,))
    0 1 2 END
    >>> printbuf_td_h_ushort(ShortMockBuffer(None, range(3)), (3,))
    Traceback (most recent call last):
       ...
931
    ValueError: Buffer dtype mismatch, expected 'td_h_ushort' but got 'short'
932 933 934 935 936 937 938 939 940 941 942 943 944 945
    """
    cdef int i
    for i in range(shape[0]):
        print buf[i],
    print 'END'

@testcase
def printbuf_td_h_double(object[td_h_double] buf, shape):
    """
    >>> printbuf_td_h_double(DoubleMockBuffer(None, [0.25, 1, 3.125]), (3,))
    0.25 1.0 3.125 END
    >>> printbuf_td_h_double(FloatMockBuffer(None, [0.25, 1, 3.125]), (3,))
    Traceback (most recent call last):
       ...
946
    ValueError: Buffer dtype mismatch, expected 'td_h_double' but got 'float'
947 948 949 950 951 952 953
    """
    cdef int i
    for i in range(shape[0]):
        print buf[i],
    print 'END'


954 955 956 957 958 959 960 961 962
#
# Object access
#
def addref(*args):
    for item in args: Py_INCREF(item)
def decref(*args):
    for item in args: Py_DECREF(item)

def get_refcount(x):
Robert Bradshaw's avatar
Robert Bradshaw committed
963
    return (<PyObject*>x).ob_refcnt
964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985

@testcase
def printbuf_object(object[object] buf, shape):
    """
    Only play with unique objects, interned numbers etc. will have
    unpredictable refcounts.

    ObjectMockBuffer doesn't do anything about increfing/decrefing,
    we to the "buffer implementor" refcounting directly in the
    testcase.

    >>> a, b, c = "globally_unique_string_23234123", {4:23}, [34,3]
    >>> get_refcount(a), get_refcount(b), get_refcount(c)
    (2, 2, 2)
    >>> A = ObjectMockBuffer(None, [a, b, c])
    >>> printbuf_object(A, (3,))
    'globally_unique_string_23234123' 2
    {4: 23} 2
    [34, 3] 2
    """
    cdef int i
    for i in range(shape[0]):
Robert Bradshaw's avatar
Robert Bradshaw committed
986
        print repr(buf[i]), (<PyObject*>buf[i]).ob_refcnt
987 988 989 990 991 992 993 994 995 996

@testcase
def assign_to_object(object[object] buf, int idx, obj):
    """
    See comments on printbuf_object above.

    >>> a, b = [1, 2, 3], [4, 5, 6]
    >>> get_refcount(a), get_refcount(b)
    (2, 2)
    >>> addref(a)
997
    >>> A = ObjectMockBuffer(None, [1, a]) # 1, ...,otherwise it thinks nested lists...
998 999 1000 1001 1002 1003 1004 1005
    >>> get_refcount(a), get_refcount(b)
    (3, 2)
    >>> assign_to_object(A, 1, b)
    >>> get_refcount(a), get_refcount(b)
    (2, 3)
    >>> decref(b)
    """
    buf[idx] = obj
1006

1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021
@testcase
def assign_temporary_to_object(object[object] buf):
    """
    See comments on printbuf_object above.

    >>> a, b = [1, 2, 3], {4:23}
    >>> get_refcount(a)
    2
    >>> addref(a)
    >>> A = ObjectMockBuffer(None, [b, a])
    >>> get_refcount(a)
    3
    >>> assign_temporary_to_object(A)
    >>> get_refcount(a)
    2
1022

1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033
    >>> printbuf_object(A, (2,))
    {4: 23} 2
    {1: 8} 2

    To avoid leaking a reference in our testcase we need to
    replace the temporary with something we can manually decref :-)
    >>> assign_to_object(A, 1, a)
    >>> decref(a)
    """
    buf[1] = {3-2: 2+(2*4)-2}

1034 1035 1036 1037 1038 1039 1040
#
# cast option
#
@testcase
def buffer_cast(object[unsigned int, cast=True] buf, int idx):
    """
    Round-trip a signed int through unsigned int buffer access.
1041

1042 1043 1044 1045 1046 1047
    >>> A = IntMockBuffer(None, [-100])
    >>> buffer_cast(A, 0)
    -100
    """
    cdef unsigned int data = buf[idx]
    return <int>data
1048

1049 1050 1051 1052
@testcase
def buffer_cast_fails(object[char, cast=True] buf):
    """
    Cannot cast between datatype of different sizes.
1053

1054 1055 1056
    >>> buffer_cast_fails(IntMockBuffer(None, [0]))
    Traceback (most recent call last):
        ...
1057
    ValueError: Item size of buffer (4 bytes) does not match size of 'char' (1 byte)
1058 1059
    """
    return buf[0]
1060

1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075
#
# Typed buffers
#
@testcase
def typedbuffer1(obj):
    """
    >>> typedbuffer1(IntMockBuffer("A", range(10)))
    acquired A
    released A
    >>> typedbuffer1(None)
    >>> typedbuffer1(4)
    Traceback (most recent call last):
       ...
    TypeError: Cannot convert int to bufaccess.IntMockBuffer
    """
1076
    cdef IntMockBuffer[int, ndim=1] buf = obj
1077 1078

@testcase
1079
def typedbuffer2(IntMockBuffer[int, ndim=1] obj):
1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095
    """
    >>> typedbuffer2(IntMockBuffer("A", range(10)))
    acquired A
    released A
    >>> typedbuffer2(None)
    >>> typedbuffer2(4)
    Traceback (most recent call last):
       ...
    TypeError: Argument 'obj' has incorrect type (expected bufaccess.IntMockBuffer, got int)
    """
    pass

#
# Test __cythonbufferdefaults__
#
@testcase
1096
def bufdefaults1(IntStridedMockBuffer[int, ndim=1] buf):
1097
    """
1098 1099 1100
    For IntStridedMockBuffer, mode should be
    "strided" by defaults which should show
    up in the flags.
1101

1102 1103 1104 1105
    >>> A = IntStridedMockBuffer("A", range(10))
    >>> bufdefaults1(A)
    acquired A
    released A
1106
    >>> [str(x) for x in A.recieved_flags]
1107 1108 1109
    ['FORMAT', 'ND', 'STRIDES']
    """
    pass
1110

1111

1112 1113 1114
@testcase
def basic_struct(object[MyStruct] buf):
    """
1115
    See also buffmt.pyx
1116

1117
    >>> basic_struct(MyStructMockBuffer(None, [(1, 2, 3, 4, 5)]))
1118
    1 2 3 4 5
1119
    >>> basic_struct(MyStructMockBuffer(None, [(1, 2, 3, 4, 5)], format="ccqii"))
1120
    1 2 3 4 5
1121 1122 1123
    """
    print buf[0].a, buf[0].b, buf[0].c, buf[0].d, buf[0].e

1124 1125 1126
@testcase
def nested_struct(object[NestedStruct] buf):
    """
1127
    See also buffmt.pyx
1128

1129 1130 1131 1132 1133 1134 1135
    >>> nested_struct(NestedStructMockBuffer(None, [(1, 2, 3, 4, 5)]))
    1 2 3 4 5
    >>> nested_struct(NestedStructMockBuffer(None, [(1, 2, 3, 4, 5)], format="T{ii}T{2i}i"))
    1 2 3 4 5
    """
    print buf[0].x.a, buf[0].x.b, buf[0].y.a, buf[0].y.b, buf[0].z

1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164
@testcase
def packed_struct(object[PackedStruct] buf):
    """
    See also buffmt.pyx

    >>> packed_struct(PackedStructMockBuffer(None, [(1, 2)]))
    1 2
    >>> packed_struct(PackedStructMockBuffer(None, [(1, 2)], format="T{c^i}"))
    1 2
    >>> packed_struct(PackedStructMockBuffer(None, [(1, 2)], format="T{c=i}"))
    1 2

    """
    print buf[0].a, buf[0].b

@testcase
def nested_packed_struct(object[NestedPackedStruct] buf):
    """
    See also buffmt.pyx

    >>> nested_packed_struct(NestedPackedStructMockBuffer(None, [(1, 2, 3, 4, 5)]))
    1 2 3 4 5
    >>> nested_packed_struct(NestedPackedStructMockBuffer(None, [(1, 2, 3, 4, 5)], format="ci^ci@i"))
    1 2 3 4 5
    >>> nested_packed_struct(NestedPackedStructMockBuffer(None, [(1, 2, 3, 4, 5)], format="^c@i^ci@i"))
    1 2 3 4 5
    """
    print buf[0].a, buf[0].b, buf[0].sub.a, buf[0].sub.b, buf[0].c

1165

1166
@testcase
1167
def complex_dtype(object[long double complex] buf):
1168
    """
1169 1170
    >>> complex_dtype(LongComplexMockBuffer(None, [(0, -1)]))
    -1j
1171
    """
1172
    print buf[0]
1173

1174
@testcase
1175
def complex_inplace(object[long double complex] buf):
1176
    """
1177 1178 1179 1180 1181 1182
    >>> complex_inplace(LongComplexMockBuffer(None, [(0, -1)]))
    (1+1j)
    """
    buf[0] = buf[0] + 1 + 2j
    print buf[0]

1183 1184 1185
@testcase
def complex_struct_dtype(object[LongComplex] buf):
    """
1186 1187
    Note that the format string is "Zg" rather than "2g", yet a struct
    is accessed.
1188 1189
    >>> complex_struct_dtype(LongComplexMockBuffer(None, [(0, -1)]))
    0.0 -1.0
1190 1191
    """
    print buf[0].real, buf[0].imag
1192 1193 1194 1195

@testcase
def complex_struct_inplace(object[LongComplex] buf):
    """
1196
    >>> complex_struct_inplace(LongComplexMockBuffer(None, [(0, -1)]))
1197 1198 1199 1200 1201
    1.0 1.0
    """
    buf[0].real += 1
    buf[0].imag += 2
    print buf[0].real, buf[0].imag
1202

1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216
#
# Nogil
#
@testcase
@cython.boundscheck(False)
def buffer_nogil():
    """
    >>> buffer_nogil()
    10
    """
    cdef object[int] buf = IntMockBuffer(None, [1,2,3])
    with nogil:
        buf[1] = 10
    return buf[1]
1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229

@testcase
def buffer_nogil_oob():
    """
    >>> buffer_nogil_oob()
    Traceback (most recent call last):
        ...
    IndexError: Out of bounds on buffer access (axis 0)
    """
    cdef object[int] buf = IntMockBuffer(None, [1,2,3])
    with nogil:
        buf[5] = 10
    return buf[1]
1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243

def get_int():
    return 10

@testcase
def test_inplace_assignment():
    """
    >>> test_inplace_assignment()
    10
    """
    cdef object[int, ndim=1] buf = IntMockBuffer(None, [1, 2, 3])

    buf[0] = get_int()
    print buf[0]
1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254

@testcase
def test_nested_assignment():
    """
    >>> test_nested_assignment()
    100
    """
    cdef object[int] inner = IntMockBuffer(None, [1, 2, 3])
    cdef object[int] outer = IntMockBuffer(None, [1, 2, 3])
    outer[inner[0]] = 100
    return outer[inner[0]]