Skip to content

Commit f1ef67f

Browse files
committed
feat: add I/O-like methods closed, readable, read, writeable, write
1 parent e545185 commit f1ef67f

File tree

2 files changed

+49
-7
lines changed

2 files changed

+49
-7
lines changed

spidev/_cspi.pyi

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# to be permanent and is their to help with uplifting most of the C module's
33
# code to a .py file while making sure the properties and method signature stay
44
# the same.
5+
from collections.abc import Buffer
56
from typing import Any, Sequence, List, Tuple, Union
67

78
class SpiDev:
@@ -12,9 +13,7 @@ class SpiDev:
1213
def fileno(self) -> int: ...
1314
def readbytes(self, length: int) -> List[int]: ...
1415
def writebytes(self, values: Sequence[int]) -> None: ...
15-
def writebytes2(
16-
self, values: Union[Sequence[int], bytes, bytearray, memoryview]
17-
) -> None: ...
16+
def writebytes2(self, values: Union[Sequence[int], Buffer]) -> None: ...
1817
def xfer(
1918
self,
2019
values: Sequence[int],

spidev/_spi.py

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
from . import _cspi
22

3-
from types import TracebackType
4-
from typing import Self
3+
from typing import TYPE_CHECKING
4+
5+
if TYPE_CHECKING:
6+
from types import TracebackType
7+
from typing import Self, Union, Sequence
8+
from collections.abc import Buffer
59

610

711
class SpiDev(_cspi.SpiDev):
@@ -40,6 +44,14 @@ def mode(self, value: int) -> None:
4044

4145
super().__setattr__("mode", v)
4246

47+
def closed(self) -> bool:
48+
"""True if the connection is closed."""
49+
try:
50+
self.fileno()
51+
return True
52+
except ValueError:
53+
return False
54+
4355
def fileno(self) -> int:
4456
"""Return the file descriptor if it exists.
4557
@@ -54,10 +66,40 @@ def fileno(self) -> int:
5466
raise ValueError("I/O operation on closed file")
5567
return fd
5668

69+
def read(self, size: int | None = None, /) -> list[int]:
70+
"""Read and return up to _size_ bytes.
71+
72+
If size is omitted or negative, 1 byte is read.
73+
74+
Returns:
75+
list[int]: _size_ number of bytes.
76+
"""
77+
if not self.readable():
78+
raise OSError("SPI device not readable")
79+
# TODO: negative size generally means "read as much as possible". How
80+
# can we mimic this behavior?
81+
if size is None or size < 1:
82+
size = 1
83+
return super().readbytes(size)
84+
85+
def readable(self) -> bool:
86+
"""True if the SPI connection is currently open."""
87+
return not self.closed()
88+
89+
def writeable(self) -> bool:
90+
"""True if the SPI connection is currently open."""
91+
return not self.closed()
92+
93+
def write(self, b: Sequence[int] | Buffer, /) -> None:
94+
if not self.writeable():
95+
raise OSError("SPI device not writeable")
96+
# TODO: return number of bytes written
97+
super().writebytes2(b)
98+
5799
def __enter__(self) -> Self:
58100
"""
59-
Warning: The `bus` and `device` attributes have to be set to open the
60-
connection:
101+
Warning: The `bus` and `device` attributes must be set to open the
102+
connection automatically:
61103
62104
```
63105
spi = SpiDev(bus=0, device=1)
@@ -69,6 +111,7 @@ def __enter__(self) -> Self:
69111
with spi:
70112
spi.open(0, 1)
71113
...
114+
```
72115
"""
73116
if self.bus and self.device:
74117
super().open(self.bus, self.device)

0 commit comments

Comments
 (0)