Skip to content

API reference

Auto-generated from the source docstrings.

Client

ZKDevice

ZKDevice(host: str = '', port: int = 4370, password: int = 0, timeout: float = 10.0, transport: Transport | None = None)

High-level read-only client for a ZKTeco device.

ZKDevice is the primary entry point of the library. It wraps a :class:~openzk.transport.Transport and a :class:~openzk.session.Session to expose convenient methods for reading device metadata, users, attendance records, operation logs, and biometric templates. All operations are read-only; nothing on the device is mutated.

Use it as a context manager so that the connection, authentication, SDK handshake, and capability read happen on entry and are cleanly torn down on exit.

Example

with ZKDevice("192.168.1.201") as dev: ... info = dev.device_info() ... users = dev.users() ... for record in dev.attendance(): ... print(record)

Initialize a device client.

Parameters:

Name Type Description Default
host str

Device IP address or hostname.

''
port int

Device TCP service port (ZKTeco default 4370).

4370
password int

Numeric comm key used for CMD_AUTH; 0 when unset.

0
timeout float

Socket connect/read timeout in seconds.

10.0
transport Transport | None

Optional pre-built :class:~openzk.transport.Transport. Primarily for dependency injection and testing; when provided, host/port/password/timeout are ignored and the given transport is used as-is.

None
Source code in src/openzk/client.py
def __init__(self, host: str = "", port: int = 4370, password: int = 0,
             timeout: float = 10.0, transport: Transport | None = None) -> None:
    """Initialize a device client.

    Args:
        host: Device IP address or hostname.
        port: Device TCP service port (ZKTeco default ``4370``).
        password: Numeric comm key used for CMD_AUTH; ``0`` when unset.
        timeout: Socket connect/read timeout in seconds.
        transport: Optional pre-built :class:`~openzk.transport.Transport`.
            Primarily for dependency injection and testing; when provided,
            ``host``/``port``/``password``/``timeout`` are ignored and the
            given transport is used as-is.
    """
    self.transport = transport or TcpTransport(host, port, timeout, password)
    self.session = Session(self.transport)
    self._caps: dict[str, str] = {}

__enter__

__enter__() -> ZKDevice

Open the session and read device capabilities.

Performs connect, authentication, the SDK handshake, and a capability read (caching the result for :meth:device_info).

Returns:

Name Type Description
This ZKDevice

class:ZKDevice instance, ready for use.

Source code in src/openzk/client.py
def __enter__(self) -> ZKDevice:
    """Open the session and read device capabilities.

    Performs connect, authentication, the SDK handshake, and a capability
    read (caching the result for :meth:`device_info`).

    Returns:
        This :class:`ZKDevice` instance, ready for use.
    """
    self.session.open()
    self._caps = self.session.capabilities()
    return self

__exit__

__exit__(*exc: object) -> None

Restore the device SDK handshake flag and disconnect.

Parameters:

Name Type Description Default
*exc object

Standard exception tuple from the with block (unused).

()
Source code in src/openzk/client.py
def __exit__(self, *exc: object) -> None:
    """Restore the device SDK handshake flag and disconnect.

    Args:
        *exc: Standard exception tuple from the ``with`` block (unused).
    """
    self.session.close()

option

option(name: str) -> str | None

Read a raw device option by name.

Parameters:

Name Type Description Default
name str

The option key (e.g. "FirmVer" or "~SerialNumber").

required

Returns:

Type Description
str | None

The raw option value as a string, or None if the device did not

str | None

return a value for that option.

Source code in src/openzk/client.py
def option(self, name: str) -> str | None:
    """Read a raw device option by name.

    Args:
        name: The option key (e.g. ``"FirmVer"`` or ``"~SerialNumber"``).

    Returns:
        The raw option value as a string, or ``None`` if the device did not
        return a value for that option.
    """
    return self.session.read_option(name)

device_info

device_info() -> DeviceInfo

Read device metadata and record counts.

Combines cached capabilities (serial, platform, etc.) with a live read of the free-size table for user/finger/attendance/face counts.

Returns:

Name Type Description
A DeviceInfo

class:~openzk.models.DeviceInfo with serial, firmware,

DeviceInfo

platform, the four record counts, and the full capabilities map.

Source code in src/openzk/client.py
def device_info(self) -> DeviceInfo:
    """Read device metadata and record counts.

    Combines cached capabilities (serial, platform, etc.) with a live read
    of the free-size table for user/finger/attendance/face counts.

    Returns:
        A :class:`~openzk.models.DeviceInfo` with serial, firmware,
        platform, the four record counts, and the full capabilities map.
    """
    c = self._caps
    users_count, fingers_count, attendance_count, faces_count = self._read_sizes()
    return DeviceInfo(
        ip=getattr(self.transport, "host", ""),
        serial=c.get("~SerialNumber", self.option("~SerialNumber") or ""),
        firmware=self.option("FirmVer") or "",
        platform=c.get("~Platform", ""),
        users_count=users_count, attendance_count=attendance_count,
        faces_count=faces_count, fingers_count=fingers_count,
        capabilities=dict(c),
    )

users

users() -> list[User]

Read all enrolled users from the device.

Returns:

Type Description
list[User]

A list of :class:~openzk.models.User objects (empty if none).

Source code in src/openzk/client.py
def users(self) -> list[User]:
    """Read all enrolled users from the device.

    Returns:
        A list of :class:`~openzk.models.User` objects (empty if none).
    """
    blob = read_buffer(self.transport, CMD_DB_RRQ, FCT_USER, 0)
    total = unpack("<I", blob[:4])[0] if len(blob) >= 4 else 0
    count = max(1, total // 72) if total else 1
    return parse_users(blob, count=count)

attendance

attendance() -> Iterator[Attendance]

Stream attendance (punch) records from the device.

Records are yielded lazily as they are parsed rather than materialized all at once, which keeps memory bounded on devices with large logs.

Yields:

Type Description
Attendance

class:~openzk.models.Attendance records in device order.

Example

with ZKDevice(host) as dev: ... for rec in dev.attendance(): ... print(rec.user_id, rec.timestamp)

Source code in src/openzk/client.py
def attendance(self) -> Iterator[Attendance]:
    """Stream attendance (punch) records from the device.

    Records are yielded lazily as they are parsed rather than materialized
    all at once, which keeps memory bounded on devices with large logs.

    Yields:
        :class:`~openzk.models.Attendance` records in device order.

    Example:
        >>> with ZKDevice(host) as dev:
        ...     for rec in dev.attendance():
        ...         print(rec.user_id, rec.timestamp)
    """
    blob = read_buffer(self.transport, CMD_DB_RRQ, FCT_ATTLOG, 0)
    total = unpack("<I", blob[:4])[0] if len(blob) >= 4 else 0
    count = max(1, total // 40) if total else 1
    yield from parse_attendance(blob, count=count)

op_logs

op_logs() -> Iterator[OpLog]

Stream device operation-log entries.

Yielded lazily as they are parsed, like :meth:attendance.

Yields:

Type Description
OpLog

class:~openzk.models.OpLog records in device order.

Source code in src/openzk/client.py
def op_logs(self) -> Iterator[OpLog]:
    """Stream device operation-log entries.

    Yielded lazily as they are parsed, like :meth:`attendance`.

    Yields:
        :class:`~openzk.models.OpLog` records in device order.
    """
    blob = read_buffer(self.transport, CMD_DB_RRQ, FCT_OPLOG, 0)
    yield from parse_op_logs(blob)

user_photo

user_photo(user_id: str) -> bytes | None

Fetch a user's enrollment portrait as JPEG bytes.

Parameters:

Name Type Description Default
user_id str

The device user ID (e.g. "8").

required

Returns:

Type Description
bytes | None

The JPEG bytes, or None if the device has no photo for this user.

Raises:

Type Description
ZKNotSupported

The device returned an unexpected response code.

ZKProtocolError

The assembled photo size did not match the header.

Example

with ZKDevice(host) as dev: ... jpeg = dev.user_photo("8")

Source code in src/openzk/client.py
def user_photo(self, user_id: str) -> bytes | None:
    """Fetch a user's enrollment portrait as JPEG bytes.

    Args:
        user_id: The device user ID (e.g. ``"8"``).

    Returns:
        The JPEG bytes, or ``None`` if the device has no photo for this user.

    Raises:
        ZKNotSupported: The device returned an unexpected response code.
        ZKProtocolError: The assembled photo size did not match the header.

    Example:
        >>> with ZKDevice(host) as dev:
        ...     jpeg = dev.user_photo("8")
    """
    op = OPCODES.get("DownloadUserPhoto", 0x271A)
    frames = self.transport.capture(op, f"{user_id}.jpg\x00".encode("latin-1"))
    if not frames:
        return None
    head = frames[0]
    if head.cmd in (CMD_NO_PIC, CMD_ACK_OK):
        return None
    if head.cmd != CMD_RECV_HEADER:
        raise ZKNotSupported("DownloadUserPhoto", head.cmd)
    declared = unpack("<I", head.data[:4])[0] if len(head.data) >= 4 else 0
    content = first_data(frames, CMD_RECV_CONTENT)
    if declared and len(content) != declared:
        raise ZKProtocolError(
            f"photo size mismatch: header={declared} got={len(content)}")
    return content or None

face_templates

face_templates() -> list[FaceTemplate]

Read enrolled face templates for every user.

Returns:

Type Description
list[FaceTemplate]

A list of :class:~openzk.models.FaceTemplate; only users with a

list[FaceTemplate]

non-empty template are included.

Raises:

Type Description
ZKNotSupported

The device lacks face support (capability FaceFunOn is not "1").

Source code in src/openzk/client.py
def face_templates(self) -> list[FaceTemplate]:
    """Read enrolled face templates for every user.

    Returns:
        A list of :class:`~openzk.models.FaceTemplate`; only users with a
        non-empty template are included.

    Raises:
        ZKNotSupported: The device lacks face support (capability
            ``FaceFunOn`` is not ``"1"``).
    """
    if self._caps.get("FaceFunOn") != "1":
        raise ZKNotSupported("GetUserFace", 0x1375)
    import struct
    op = OPCODES.get("GetUserFace", 0x0096)
    out: list[FaceTemplate] = []
    for u in self.users():
        pid = str(u.user_id).encode()
        payload = pid + struct.pack(f"<{24 - len(pid)}xxB2x", 50)
        frames = self.transport.capture(op, payload)
        data = first_data(frames, CMD_RECV_CONTENT)
        if data:
            out.append(FaceTemplate(uid=u.uid, data=data))
    return out

fingerprint_templates

fingerprint_templates() -> list[FingerTemplate]

Read all enrolled fingerprint templates.

Returns:

Type Description
list[FingerTemplate]

A list of :class:~openzk.models.FingerTemplate (empty if none).

Source code in src/openzk/client.py
def fingerprint_templates(self) -> list[FingerTemplate]:
    """Read all enrolled fingerprint templates.

    Returns:
        A list of :class:`~openzk.models.FingerTemplate` (empty if none).
    """
    blob = read_buffer(self.transport, CMD_DB_RRQ, FCT_FINGERTMP, 0)
    if len(blob) <= 4:
        return []
    body = blob[4:]
    out: list[FingerTemplate] = []
    while len(body) >= 6:
        size = unpack("<H", body[:2])[0]
        if size < 6 or size > len(body):
            break
        uid, fid, _valid = unpack("<HHb", body[2:7])
        out.append(FingerTemplate(uid=uid, finger_index=fid, data=body[6:size]))
        body = body[size:]
    return out

Models

models

Frozen dataclass models for the openzk client.

These are plain, immutable value objects returned by :class:openzk.client.ZKDevice. They perform no I/O and hold only decoded device data.

DeviceInfo dataclass

DeviceInfo(ip: str, serial: str, firmware: str, platform: str, users_count: int, attendance_count: int, faces_count: int, fingers_count: int, capabilities: dict[str, str] = dict())

Device identity and record counts.

Attributes:

Name Type Description
ip str

The device IP/hostname the client connected to.

serial str

Device serial number (~SerialNumber).

firmware str

Firmware version string (FirmVer).

platform str

Hardware/firmware platform string (~Platform).

users_count int

Number of enrolled users.

attendance_count int

Number of stored attendance records.

faces_count int

Number of enrolled face templates.

fingers_count int

Number of enrolled fingerprint templates.

capabilities dict[str, str]

Raw capability option map read from the device.

User dataclass

User(uid: int, user_id: str, name: str, privilege: int, password: str, group_id: str, card: int)

An enrolled device user.

Attributes:

Name Type Description
uid int

Internal numeric record index used by the device.

user_id str

Human-facing user ID (badge/PIN), as a string.

name str

Display name; falls back to NN-<user_id> when blank.

privilege int

Privilege level code (e.g. user vs. admin).

password str

User PIN/password, if set.

group_id str

Access group identifier.

card int

RFID card number (0 when none).

Attendance dataclass

Attendance(user_id: str, timestamp: datetime, status: int, punch: int)

A single attendance (punch) record.

Attributes:

Name Type Description
user_id str

The user ID that produced the punch.

timestamp datetime

When the punch occurred.

status int

Verification status code (e.g. fingerprint, face, password).

punch int

Punch state code (e.g. check-in/check-out).

OpLog dataclass

OpLog(op: int, timestamp: datetime, target_uid: int, raw: bytes)

A device operation-log entry.

Attributes:

Name Type Description
op int

Operation code identifying what occurred.

timestamp datetime

When the operation occurred.

target_uid int

The UID affected by the operation, when applicable.

raw bytes

The original 16-byte record, preserved for further analysis.

FingerTemplate dataclass

FingerTemplate(uid: int, finger_index: int, data: bytes)

An enrolled fingerprint template.

Attributes:

Name Type Description
uid int

The user's internal numeric record index.

finger_index int

Which finger this template represents (0-9).

data bytes

The raw template bytes.

FaceTemplate dataclass

FaceTemplate(uid: int, data: bytes)

An enrolled face template.

Attributes:

Name Type Description
uid int

The user's internal numeric record index.

data bytes

The raw face-template bytes.

UserPhoto dataclass

UserPhoto(user_id: str, data: bytes)

A user's enrollment portrait.

Attributes:

Name Type Description
user_id str

The user ID the photo belongs to.

data bytes

The raw JPEG image bytes.

byte_size property

byte_size: int

Size of the photo in bytes.

Returns:

Type Description
int

The length of :attr:data.

sha256 property

sha256: str

Content hash of the photo.

Returns:

Type Description
str

The hex-encoded SHA-256 digest of :attr:data.

Errors

errors

Typed exceptions for openzk.

ZKError

Bases: Exception

Base class for all openzk errors; catch this to handle any failure.

ZKUnreachable

Bases: ZKError

Raised when the device socket cannot be reached (timeout/refused/reset).

ZKAuthFailed

Bases: ZKError

Raised when the device rejects the connection or comm key during auth.

ZKProtocolError

Bases: ZKError

Raised on malformed framing, bad magic, truncation, or a size mismatch.

ZKNotSupported

ZKNotSupported(function: str, response_code: int)

Bases: ZKError

Raised when the device replies that a requested command is unsupported.

Initialize the error.

Parameters:

Name Type Description Default
function str

Name of the requested device function/command.

required
response_code int

The unexpected response code the device returned.

required
Source code in src/openzk/errors.py
def __init__(self, function: str, response_code: int) -> None:
    """Initialize the error.

    Args:
        function: Name of the requested device function/command.
        response_code: The unexpected response code the device returned.
    """
    self.function = function
    self.response_code = response_code
    super().__init__(f"{function!r} not supported by device (resp={response_code:#06x})")

Transport

transport

Transport layer: abstract Transport, TCP implementation, command helpers.

Transport

Bases: ABC

Abstract command transport for a ZK device.

Implementations move :class:~openzk.framing.Frame packets to and from a device. The library depends only on this interface, which allows real sockets and in-memory fakes (for tests) to be used interchangeably.

connect abstractmethod

connect() -> None

Establish the connection and authenticate.

Source code in src/openzk/transport.py
@abc.abstractmethod
def connect(self) -> None:
    """Establish the connection and authenticate."""

disconnect abstractmethod

disconnect() -> None

Tear down the connection, releasing any resources.

Source code in src/openzk/transport.py
@abc.abstractmethod
def disconnect(self) -> None:
    """Tear down the connection, releasing any resources."""

capture abstractmethod

capture(command: int, data: bytes = b'') -> list[Frame]

Send a command and collect the response frames.

Parameters:

Name Type Description Default
command int

The ZK command code to send.

required
data bytes

Optional command payload.

b''

Returns:

Type Description
list[Frame]

The first response frame followed by any subsequent (streamed)

list[Frame]

frames the device sends before going idle.

Source code in src/openzk/transport.py
@abc.abstractmethod
def capture(self, command: int, data: bytes = b"") -> list[Frame]:
    """Send a command and collect the response frames.

    Args:
        command: The ZK command code to send.
        data: Optional command payload.

    Returns:
        The first response frame followed by any subsequent (streamed)
        frames the device sends before going idle.
    """

TcpTransport

TcpTransport(host: str, port: int = 4370, timeout: float = 10.0, password: int = 0, idle: float = 1.5)

Bases: Transport

TCP implementation of :class:Transport for ZK devices.

Speaks the official ZK TCP framing: a connect/auth handshake on :meth:connect, request/response with a rolling reply id, and idle-based detection of the end of a streamed response in :meth:capture.

Initialize the TCP transport.

Parameters:

Name Type Description Default
host str

Device IP address or hostname.

required
port int

Device TCP service port (ZKTeco default 4370).

4370
timeout float

Socket connect/read timeout in seconds.

10.0
password int

Numeric comm key used for CMD_AUTH; 0 when unset.

0
idle float

Seconds of silence used to detect the end of a streamed multi-frame response in :meth:capture.

1.5
Source code in src/openzk/transport.py
def __init__(self, host: str, port: int = 4370, timeout: float = 10.0,
             password: int = 0, idle: float = 1.5) -> None:
    """Initialize the TCP transport.

    Args:
        host: Device IP address or hostname.
        port: Device TCP service port (ZKTeco default ``4370``).
        timeout: Socket connect/read timeout in seconds.
        password: Numeric comm key used for CMD_AUTH; ``0`` when unset.
        idle: Seconds of silence used to detect the end of a streamed
            multi-frame response in :meth:`capture`.
    """
    self.host = host
    self.port = port
    self.timeout = timeout
    self.password = password
    self.idle = idle
    self._sock: socket.socket | None = None
    self.session_id = 0
    self.reply_id = USHRT_MAX - 1

connect

connect() -> None

Open the socket and perform connect + optional auth.

Sends CMD_CONNECT; if the device replies CMD_ACK_UNAUTH, follows up with CMD_AUTH using the scrambled comm key.

Raises:

Type Description
ZKUnreachable

The socket connection could not be established.

ZKAuthFailed

The device rejected the connection or comm key.

Source code in src/openzk/transport.py
def connect(self) -> None:
    """Open the socket and perform connect + optional auth.

    Sends CMD_CONNECT; if the device replies CMD_ACK_UNAUTH, follows up with
    CMD_AUTH using the scrambled comm key.

    Raises:
        ZKUnreachable: The socket connection could not be established.
        ZKAuthFailed: The device rejected the connection or comm key.
    """
    try:
        self._sock = socket.create_connection((self.host, self.port), timeout=self.timeout)
    except OSError as e:
        raise ZKUnreachable(f"cannot reach {self.host}:{self.port}: {e}") from e
    self.session_id = 0
    self.reply_id = USHRT_MAX - 1
    frame = self._exchange(CMD_CONNECT)
    self.session_id = frame.session
    if frame.cmd == CMD_ACK_UNAUTH:
        frame = self._exchange(CMD_AUTH, make_commkey(self.password, self.session_id))
    if frame.cmd != CMD_ACK_OK:
        raise ZKAuthFailed(f"connect rejected: cmd={frame.cmd:#06x}")

disconnect

disconnect() -> None

Send CMD_EXIT and close the socket.

Errors during the exit exchange are suppressed; the socket is always closed. Safe to call when already disconnected.

Source code in src/openzk/transport.py
def disconnect(self) -> None:
    """Send CMD_EXIT and close the socket.

    Errors during the exit exchange are suppressed; the socket is always
    closed. Safe to call when already disconnected.
    """
    if self._sock is not None:
        try:
            self._exchange(CMD_EXIT)
        except (OSError, ZKProtocolError):
            pass
        self._sock.close()
        self._sock = None

capture

capture(command: int, data: bytes = b'') -> list[Frame]

Send a command and read until the device goes idle.

Sends one request, then keeps reading frames using the idle timeout to detect the end of a streamed response.

Parameters:

Name Type Description Default
command int

The ZK command code to send.

required
data bytes

Optional command payload.

b''

Returns:

Type Description
list[Frame]

The first response frame followed by any streamed follow-on frames.

Raises:

Type Description
ZKProtocolError

A frame had bad magic or the socket closed mid-read.

Source code in src/openzk/transport.py
def capture(self, command: int, data: bytes = b"") -> list[Frame]:
    """Send a command and read until the device goes idle.

    Sends one request, then keeps reading frames using the ``idle`` timeout
    to detect the end of a streamed response.

    Args:
        command: The ZK command code to send.
        data: Optional command payload.

    Returns:
        The first response frame followed by any streamed follow-on frames.

    Raises:
        ZKProtocolError: A frame had bad magic or the socket closed
            mid-read.
    """
    assert self._sock is not None
    frames = [self._exchange(command, data)]
    self._sock.settimeout(self.idle)
    try:
        while True:
            try:
                f = self._recv_frame()
            except (TimeoutError, OSError):
                break
            self.reply_id = f.reply
            frames.append(f)
    finally:
        self._sock.settimeout(self.timeout)
    return frames

first_data

first_data(frames: list[Frame], want_cmd: int) -> bytes

Concatenate the payloads of all frames matching a command.

Parameters:

Name Type Description Default
frames list[Frame]

The frames to scan.

required
want_cmd int

The command code to match.

required

Returns:

Type Description
bytes

The concatenated data of every frame whose cmd equals

bytes

want_cmd (empty bytes if none match).

Source code in src/openzk/transport.py
def first_data(frames: list[Frame], want_cmd: int) -> bytes:
    """Concatenate the payloads of all frames matching a command.

    Args:
        frames: The frames to scan.
        want_cmd: The command code to match.

    Returns:
        The concatenated ``data`` of every frame whose ``cmd`` equals
        ``want_cmd`` (empty bytes if none match).
    """
    return b"".join(f.data for f in frames if f.cmd == want_cmd)

read_buffer

read_buffer(transport: Transport, command: int, fct: int, ext: int = 0) -> bytes

Read a device table via the buffered SDK flow.

Issues CMD_PREPARE_BUFFER for (command, fct, ext), then pulls the result in MAX_CHUNK-sized CMD_READ_BUFFER requests and frees the buffer. Mirrors the official SDK / pyzk read_with_buffer flow.

Parameters:

Name Type Description Default
transport Transport

The transport used to exchange frames.

required
command int

The underlying data command (e.g. CMD_DB_RRQ).

required
fct int

The table/function selector for the read.

required
ext int

Optional extension parameter passed to PREPARE_BUFFER.

0

Returns:

Type Description
bytes

The assembled table blob, or empty bytes if the device reports no data.

Source code in src/openzk/transport.py
def read_buffer(transport: Transport, command: int, fct: int, ext: int = 0) -> bytes:
    """Read a device table via the buffered SDK flow.

    Issues CMD_PREPARE_BUFFER for ``(command, fct, ext)``, then pulls the result
    in ``MAX_CHUNK``-sized CMD_READ_BUFFER requests and frees the buffer. Mirrors
    the official SDK / pyzk ``read_with_buffer`` flow.

    Args:
        transport: The transport used to exchange frames.
        command: The underlying data command (e.g. CMD_DB_RRQ).
        fct: The table/function selector for the read.
        ext: Optional extension parameter passed to PREPARE_BUFFER.

    Returns:
        The assembled table blob, or empty bytes if the device reports no data.
    """
    cs = pack("<bhii", 1, command, fct, ext)
    frames = transport.capture(CMD_PREPARE_BUFFER, cs)
    if not frames:
        return b""
    head = frames[0]
    if head.cmd == CMD_DATA:
        return first_data(frames, CMD_DATA)
    if head.cmd != CMD_ACK_OK or len(head.data) < 5:
        return b""
    size = unpack("<I", head.data[1:5])[0]
    if size == 0:
        return b""
    out = bytearray()
    start = 0
    while start < size:
        this = min(MAX_CHUNK, size - start)
        chunk_frames = transport.capture(CMD_READ_BUFFER, pack("<ii", start, this))
        out += first_data(chunk_frames, CMD_DATA)
        start += this
    transport.capture(CMD_FREE_DATA)
    return bytes(out)

Session

session

Session: connect + AUTH + SDKBuild handshake + capability read, restored on close.

Session

Session(transport: Transport)

Manage the device session lifecycle and option access.

Wraps a :class:~openzk.transport.Transport to read/write device options and to perform the SDK handshake required for many table reads. On :meth:open the original SDKBuild value is saved and set to "1"; :meth:close restores it before disconnecting so the device is left in its prior state.

Initialize the session.

Parameters:

Name Type Description Default
transport Transport

The transport used to exchange command frames.

required
Source code in src/openzk/session.py
def __init__(self, transport: Transport) -> None:
    """Initialize the session.

    Args:
        transport: The transport used to exchange command frames.
    """
    self.transport = transport
    self._orig_sdkbuild: str | None = None

read_option

read_option(name: str) -> str | None

Read a single device option via CMD_OPTIONS_RRQ.

Parameters:

Name Type Description Default
name str

The option key to query.

required

Returns:

Type Description
str | None

The option value (text after =, up to the NUL terminator), or

str | None

None if the device did not acknowledge the request.

Source code in src/openzk/session.py
def read_option(self, name: str) -> str | None:
    """Read a single device option via CMD_OPTIONS_RRQ.

    Args:
        name: The option key to query.

    Returns:
        The option value (text after ``=``, up to the NUL terminator), or
        ``None`` if the device did not acknowledge the request.
    """
    frames = self.transport.capture(CMD_OPTIONS_RRQ, name.encode() + b"\x00")
    if frames and frames[0].cmd == CMD_ACK_OK:
        return frames[0].data.split(b"=", 1)[-1].split(b"\x00")[0].decode("latin-1", "replace")
    return None

write_option

write_option(name: str, value: str) -> None

Write a device option and refresh device state.

Sends CMD_OPTIONS_WRQ with name=value followed by CMD_REFRESHDATA.

Parameters:

Name Type Description Default
name str

The option key to set.

required
value str

The value to assign.

required
Source code in src/openzk/session.py
def write_option(self, name: str, value: str) -> None:
    """Write a device option and refresh device state.

    Sends CMD_OPTIONS_WRQ with ``name=value`` followed by CMD_REFRESHDATA.

    Args:
        name: The option key to set.
        value: The value to assign.
    """
    self.transport.capture(CMD_OPTIONS_WRQ, f"{name}={value}\x00".encode())
    self.transport.capture(CMD_REFRESHDATA)

open

open() -> None

Connect and perform the SDK handshake.

Connects the transport, saves the current SDKBuild option (defaulting to "0" if absent), then sets SDKBuild to "1" so the device serves buffered table reads.

Source code in src/openzk/session.py
def open(self) -> None:
    """Connect and perform the SDK handshake.

    Connects the transport, saves the current ``SDKBuild`` option (defaulting
    to ``"0"`` if absent), then sets ``SDKBuild`` to ``"1"`` so the device
    serves buffered table reads.
    """
    self.transport.connect()
    self._orig_sdkbuild = self.read_option("SDKBuild") or "0"
    self.write_option("SDKBuild", "1")

close

close() -> None

Restore the SDK handshake flag and disconnect.

If :meth:open ran, restores the original SDKBuild value (ignoring write errors) before disconnecting the transport.

Source code in src/openzk/session.py
def close(self) -> None:
    """Restore the SDK handshake flag and disconnect.

    If :meth:`open` ran, restores the original ``SDKBuild`` value (ignoring
    write errors) before disconnecting the transport.
    """
    if self._orig_sdkbuild is not None:
        try:
            self.write_option("SDKBuild", self._orig_sdkbuild)
        except Exception:
            pass
        self._orig_sdkbuild = None
    self.transport.disconnect()

capabilities

capabilities() -> dict[str, str]

Read the standard set of device capability options.

Queries each well-known option name and collects the ones the device answers with a non-empty value.

Returns:

Type Description
dict[str, str]

A mapping of option name to value for every present capability.

Source code in src/openzk/session.py
def capabilities(self) -> dict[str, str]:
    """Read the standard set of device capability options.

    Queries each well-known option name and collects the ones the device
    answers with a non-empty value.

    Returns:
        A mapping of option name to value for every present capability.
    """
    caps: dict[str, str] = {}
    for name in OPTION_NAMES:
        val = self.read_option(name)
        if val not in (None, ""):
            caps[name] = val  # type: ignore[assignment]
    return caps