Skip to content

Code documentation⚓︎

Switcher integration main module.

Switcher integration TCP socket API module.

Command ⚓︎

Bases: Enum

Enum for turning the device on or off.

Source code in src/aioswitcher/api/__init__.py
71
72
73
74
75
76
@unique
class Command(Enum):
    """Enum for turning the device on or off."""

    ON = "1"
    OFF = "0"

SwitcherApi ⚓︎

Bases: ABC

Switcher TCP based API.

Parameters:

Name Type Description Default
ip_address str

the ip address assigned to the device.

required
device_id str

the id of the desired device.

required
device_key str

the login key of the device.

required
port int

the port of the device, default is 9957.

SWITCHER_TCP_PORT_TYPE1
Source code in src/aioswitcher/api/__init__.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
class SwitcherApi(ABC):
    """Switcher TCP based API.

    Args:
        ip_address: the ip address assigned to the device.
        device_id: the id of the desired device.
        device_key: the login key of the device.
        port: the port of the device, default is 9957.

    """

    def __init__(
        self,
        ip_address: str,
        device_id: str,
        device_key: str,
        port: int = SWITCHER_TCP_PORT_TYPE1,
    ) -> None:
        """Initialize the Switcher TCP connection API."""
        self._ip_address = ip_address
        self._device_id = device_id
        self._device_key = device_key
        self._port = port
        self._connected = False

    @property
    def connected(self) -> bool:
        """Return true if api is connected."""
        return self._connected

    async def __aenter__(self) -> "SwitcherApi":
        """Enter SwitcherApi asynchronous context manager.

        Returns:
            This instance of ``aioswitcher.api.SwitcherApi``.

        """
        await self.connect()
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        """Exit SwitcherApi asynchronous context manager."""
        await self.disconnect()

    async def connect(self) -> None:
        """Connect to asynchronous socket and get reader and writer object."""
        logger.info("connecting to the switcher device")
        self._reader, self._writer = await open_connection(
            host=self._ip_address,
            port=self._port,
            family=AF_INET,
        )

        self._connected = True
        logger.info("switcher device connected")

    async def disconnect(self) -> None:
        """Disconnect from asynchronous socket."""
        if hasattr(self, "_writer") and self._writer:
            logger.info("disconnecting from the switcher device")
            self._writer.close()
            await self._writer.wait_closed()
        else:
            logger.info("switcher device not connected")
        self._connected = False

    async def _login(
        self, device_type: Union[DeviceType, None] = None
    ) -> Tuple[str, SwitcherLoginResponse]:
        """Use for sending the login packet to the device.

        Returns:
            A tuple of the hex timestamp and an instance of ``SwitcherLoginResponse``.

        Note:
            This is a private function used by other functions, do not call this
            function directly.

        """
        timestamp = current_timestamp_to_hexadecimal()
        if (
            device_type
            and device_type == DeviceType.BREEZE
            or device_type == DeviceType.RUNNER
            or device_type == DeviceType.RUNNER_MINI
        ):
            packet = packets.LOGIN2_PACKET_TYPE2.format(timestamp, self._device_id)
        else:
            packet = packets.LOGIN_PACKET_TYPE1.format(timestamp, self._device_key)
        signed_packet = sign_packet_with_crc_key(packet)

        logger.debug("sending a login packet")
        self._writer.write(unhexlify(signed_packet))
        response = await self._reader.read(1024)
        return timestamp, SwitcherLoginResponse(response)

    async def stop(self) -> SwitcherBaseResponse:
        """Use for stopping the shutter.

        Returns:
            An instance of ``SwitcherBaseResponse``.
        """
        logger.debug("about to send stop shutter command")
        timestamp, login_resp = await self._login(DeviceType.RUNNER)
        if not login_resp.successful:
            logger.error("Failed to log into device with id %s", self._device_id)
            raise RuntimeError("login request was not successful")

        logger.debug(
            "logged in session_id=%s, timestamp=%s", login_resp.session_id, timestamp
        )

        packet = packets.RUNNER_STOP_COMMAND.format(
            login_resp.session_id, timestamp, self._device_id
        )

        packet = set_message_length(packet)
        signed_packet = sign_packet_with_crc_key(packet)

        logger.debug("sending a stop control packet")

        self._writer.write(unhexlify(signed_packet))
        response = await self._reader.read(1024)
        return SwitcherBaseResponse(response)

    async def get_state(self) -> SwitcherStateResponse:
        """Use for sending the get state packet to the device.

        Returns:
            An instance of ``SwitcherStateResponse``.
        """
        raise NotImplementedError

    async def get_breeze_state(self) -> SwitcherThermostatStateResponse:
        """Use for sending the get state packet to the Breeze device.

        Returns:
            An instance of ``SwitcherThermostatStateResponse``.

        """
        raise NotImplementedError

    async def control_device(
        self, command: Command, minutes: int = 0
    ) -> SwitcherBaseResponse:
        """Use for sending the control packet to the device.

        Args:
            command: use the ``aioswitcher.api.Command`` enum.
            minutes: if turning-on optionally incorporate a timer.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        raise NotImplementedError

    async def control_breeze_device(
        self,
        remote: SwitcherBreezeRemote,
        state: Union[DeviceState, None] = None,
        mode: Union[ThermostatMode, None] = None,
        target_temp: int = 0,
        fan_level: Union[ThermostatFanLevel, None] = None,
        swing: Union[ThermostatSwing, None] = None,
        update_state: bool = False,
    ) -> SwitcherBaseResponse:
        """Use for sending the control packet to the Breeze device.

        Args:
            remote: the remote for the breeze device
            state: the desired state of the device
            mode: the desired mode of the device
            target_temp: the target temperature
            fan_level: the desired fan level
            swing: the desired swing state
            update_state: update the device state without controlling the device

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        raise NotImplementedError

    async def set_position(self, position: int = 0) -> SwitcherBaseResponse:
        """Use for setting the shutter position of the Runner and Runner Mini devices.

        Args:
            position: the position to set the device to, default to 0.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        raise NotImplementedError

    async def set_device_name(self, name: str) -> SwitcherBaseResponse:
        """Use for sending the set name packet to the device.

        Args:
            name: string name with the length of 2 >= x >= 32.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        raise NotImplementedError

    async def set_auto_shutdown(self, full_time: timedelta) -> SwitcherBaseResponse:
        """Use for sending the set auto-off packet to the device.

        Args:
            full_time: timedelta value containing the configuration value for
                auto-shutdown.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        raise NotImplementedError

    async def get_schedules(self) -> SwitcherGetSchedulesResponse:
        """Use for retrieval of the schedules from the device.

        Returns:
            An instance of ``SwitcherGetSchedulesResponse``.

        """
        raise NotImplementedError

    async def delete_schedule(self, schedule_id: str) -> SwitcherBaseResponse:
        """Use for deleting a schedule from the device.

        Use ``get_schedules`` to retrieve the schedule instance.

        Args:
            schedule_id: the identification of the schedule for deletion.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        raise NotImplementedError

    async def create_schedule(
        self, start_time: str, end_time: str, days: Set[Days] = set()
    ) -> SwitcherBaseResponse:
        """Use for creating a new schedule in the next empty schedule slot.

        Args:
            start_time: a string start time in %H:%M format. e.g. 13:00.
            end_time: a string start time in %H:%M format. e.g. 13:00.
            days: for recurring schedules, add ``Days``.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        raise NotImplementedError

connected: bool property ⚓︎

Return true if api is connected.

__aenter__() async ⚓︎

Enter SwitcherApi asynchronous context manager.

Returns:

Type Description
SwitcherApi

This instance of aioswitcher.api.SwitcherApi.

Source code in src/aioswitcher/api/__init__.py
109
110
111
112
113
114
115
116
117
async def __aenter__(self) -> "SwitcherApi":
    """Enter SwitcherApi asynchronous context manager.

    Returns:
        This instance of ``aioswitcher.api.SwitcherApi``.

    """
    await self.connect()
    return self

__aexit__(exc_type, exc_value, traceback) async ⚓︎

Exit SwitcherApi asynchronous context manager.

Source code in src/aioswitcher/api/__init__.py
119
120
121
122
123
124
125
126
async def __aexit__(
    self,
    exc_type: Optional[Type[BaseException]],
    exc_value: Optional[BaseException],
    traceback: Optional[TracebackType],
) -> None:
    """Exit SwitcherApi asynchronous context manager."""
    await self.disconnect()

__init__(ip_address, device_id, device_key, port=SWITCHER_TCP_PORT_TYPE1) ⚓︎

Initialize the Switcher TCP connection API.

Source code in src/aioswitcher/api/__init__.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def __init__(
    self,
    ip_address: str,
    device_id: str,
    device_key: str,
    port: int = SWITCHER_TCP_PORT_TYPE1,
) -> None:
    """Initialize the Switcher TCP connection API."""
    self._ip_address = ip_address
    self._device_id = device_id
    self._device_key = device_key
    self._port = port
    self._connected = False

connect() async ⚓︎

Connect to asynchronous socket and get reader and writer object.

Source code in src/aioswitcher/api/__init__.py
128
129
130
131
132
133
134
135
136
137
138
async def connect(self) -> None:
    """Connect to asynchronous socket and get reader and writer object."""
    logger.info("connecting to the switcher device")
    self._reader, self._writer = await open_connection(
        host=self._ip_address,
        port=self._port,
        family=AF_INET,
    )

    self._connected = True
    logger.info("switcher device connected")

control_breeze_device(remote, state=None, mode=None, target_temp=0, fan_level=None, swing=None, update_state=False) async ⚓︎

Use for sending the control packet to the Breeze device.

Parameters:

Name Type Description Default
remote SwitcherBreezeRemote

the remote for the breeze device

required
state Union[DeviceState, None]

the desired state of the device

None
mode Union[ThermostatMode, None]

the desired mode of the device

None
target_temp int

the target temperature

0
fan_level Union[ThermostatFanLevel, None]

the desired fan level

None
swing Union[ThermostatSwing, None]

the desired swing state

None
update_state bool

update the device state without controlling the device

False

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
async def control_breeze_device(
    self,
    remote: SwitcherBreezeRemote,
    state: Union[DeviceState, None] = None,
    mode: Union[ThermostatMode, None] = None,
    target_temp: int = 0,
    fan_level: Union[ThermostatFanLevel, None] = None,
    swing: Union[ThermostatSwing, None] = None,
    update_state: bool = False,
) -> SwitcherBaseResponse:
    """Use for sending the control packet to the Breeze device.

    Args:
        remote: the remote for the breeze device
        state: the desired state of the device
        mode: the desired mode of the device
        target_temp: the target temperature
        fan_level: the desired fan level
        swing: the desired swing state
        update_state: update the device state without controlling the device

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    raise NotImplementedError

control_device(command, minutes=0) async ⚓︎

Use for sending the control packet to the device.

Parameters:

Name Type Description Default
command Command

use the aioswitcher.api.Command enum.

required
minutes int

if turning-on optionally incorporate a timer.

0

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
async def control_device(
    self, command: Command, minutes: int = 0
) -> SwitcherBaseResponse:
    """Use for sending the control packet to the device.

    Args:
        command: use the ``aioswitcher.api.Command`` enum.
        minutes: if turning-on optionally incorporate a timer.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    raise NotImplementedError

create_schedule(start_time, end_time, days=set()) async ⚓︎

Use for creating a new schedule in the next empty schedule slot.

Parameters:

Name Type Description Default
start_time str

a string start time in %H:%M format. e.g. 13:00.

required
end_time str

a string start time in %H:%M format. e.g. 13:00.

required
days Set[Days]

for recurring schedules, add Days.

set()

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
async def create_schedule(
    self, start_time: str, end_time: str, days: Set[Days] = set()
) -> SwitcherBaseResponse:
    """Use for creating a new schedule in the next empty schedule slot.

    Args:
        start_time: a string start time in %H:%M format. e.g. 13:00.
        end_time: a string start time in %H:%M format. e.g. 13:00.
        days: for recurring schedules, add ``Days``.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    raise NotImplementedError

delete_schedule(schedule_id) async ⚓︎

Use for deleting a schedule from the device.

Use get_schedules to retrieve the schedule instance.

Parameters:

Name Type Description Default
schedule_id str

the identification of the schedule for deletion.

required

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
314
315
316
317
318
319
320
321
322
323
324
325
326
async def delete_schedule(self, schedule_id: str) -> SwitcherBaseResponse:
    """Use for deleting a schedule from the device.

    Use ``get_schedules`` to retrieve the schedule instance.

    Args:
        schedule_id: the identification of the schedule for deletion.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    raise NotImplementedError

disconnect() async ⚓︎

Disconnect from asynchronous socket.

Source code in src/aioswitcher/api/__init__.py
140
141
142
143
144
145
146
147
148
async def disconnect(self) -> None:
    """Disconnect from asynchronous socket."""
    if hasattr(self, "_writer") and self._writer:
        logger.info("disconnecting from the switcher device")
        self._writer.close()
        await self._writer.wait_closed()
    else:
        logger.info("switcher device not connected")
    self._connected = False

get_breeze_state() async ⚓︎

Use for sending the get state packet to the Breeze device.

Returns:

Type Description
SwitcherThermostatStateResponse

An instance of SwitcherThermostatStateResponse.

Source code in src/aioswitcher/api/__init__.py
217
218
219
220
221
222
223
224
async def get_breeze_state(self) -> SwitcherThermostatStateResponse:
    """Use for sending the get state packet to the Breeze device.

    Returns:
        An instance of ``SwitcherThermostatStateResponse``.

    """
    raise NotImplementedError

get_schedules() async ⚓︎

Use for retrieval of the schedules from the device.

Returns:

Type Description
SwitcherGetSchedulesResponse

An instance of SwitcherGetSchedulesResponse.

Source code in src/aioswitcher/api/__init__.py
305
306
307
308
309
310
311
312
async def get_schedules(self) -> SwitcherGetSchedulesResponse:
    """Use for retrieval of the schedules from the device.

    Returns:
        An instance of ``SwitcherGetSchedulesResponse``.

    """
    raise NotImplementedError

get_state() async ⚓︎

Use for sending the get state packet to the device.

Returns:

Type Description
SwitcherStateResponse

An instance of SwitcherStateResponse.

Source code in src/aioswitcher/api/__init__.py
209
210
211
212
213
214
215
async def get_state(self) -> SwitcherStateResponse:
    """Use for sending the get state packet to the device.

    Returns:
        An instance of ``SwitcherStateResponse``.
    """
    raise NotImplementedError

set_auto_shutdown(full_time) async ⚓︎

Use for sending the set auto-off packet to the device.

Parameters:

Name Type Description Default
full_time timedelta

timedelta value containing the configuration value for auto-shutdown.

required

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
292
293
294
295
296
297
298
299
300
301
302
303
async def set_auto_shutdown(self, full_time: timedelta) -> SwitcherBaseResponse:
    """Use for sending the set auto-off packet to the device.

    Args:
        full_time: timedelta value containing the configuration value for
            auto-shutdown.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    raise NotImplementedError

set_device_name(name) async ⚓︎

Use for sending the set name packet to the device.

Parameters:

Name Type Description Default
name str

string name with the length of 2 >= x >= 32.

required

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
280
281
282
283
284
285
286
287
288
289
290
async def set_device_name(self, name: str) -> SwitcherBaseResponse:
    """Use for sending the set name packet to the device.

    Args:
        name: string name with the length of 2 >= x >= 32.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    raise NotImplementedError

set_position(position=0) async ⚓︎

Use for setting the shutter position of the Runner and Runner Mini devices.

Parameters:

Name Type Description Default
position int

the position to set the device to, default to 0.

0

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
268
269
270
271
272
273
274
275
276
277
278
async def set_position(self, position: int = 0) -> SwitcherBaseResponse:
    """Use for setting the shutter position of the Runner and Runner Mini devices.

    Args:
        position: the position to set the device to, default to 0.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    raise NotImplementedError

stop() async ⚓︎

Use for stopping the shutter.

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
async def stop(self) -> SwitcherBaseResponse:
    """Use for stopping the shutter.

    Returns:
        An instance of ``SwitcherBaseResponse``.
    """
    logger.debug("about to send stop shutter command")
    timestamp, login_resp = await self._login(DeviceType.RUNNER)
    if not login_resp.successful:
        logger.error("Failed to log into device with id %s", self._device_id)
        raise RuntimeError("login request was not successful")

    logger.debug(
        "logged in session_id=%s, timestamp=%s", login_resp.session_id, timestamp
    )

    packet = packets.RUNNER_STOP_COMMAND.format(
        login_resp.session_id, timestamp, self._device_id
    )

    packet = set_message_length(packet)
    signed_packet = sign_packet_with_crc_key(packet)

    logger.debug("sending a stop control packet")

    self._writer.write(unhexlify(signed_packet))
    response = await self._reader.read(1024)
    return SwitcherBaseResponse(response)

SwitcherType1Api ⚓︎

Bases: SwitcherApi

Switcher Type1 devices (Plug, V2, Touch, V4) TCP based API.

Parameters:

Name Type Description Default
ip_address str

the ip address assigned to the device.

required
device_id str

the id of the desired device.

required
device_key str

the login key of the device.

required
Source code in src/aioswitcher/api/__init__.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
@final
class SwitcherType1Api(SwitcherApi):
    """Switcher Type1 devices (Plug, V2, Touch, V4) TCP based API.

    Args:
        ip_address: the ip address assigned to the device.
        device_id: the id of the desired device.
        device_key: the login key of the device.
    """

    def __init__(self, ip_address: str, device_id: str, device_key: str) -> None:
        """Initialize the Switcher TCP connection API."""
        super().__init__(ip_address, device_id, device_key, SWITCHER_TCP_PORT_TYPE1)

    async def get_state(self) -> SwitcherStateResponse:
        """Use for sending the get state packet to the device.

        Returns:
            An instance of ``SwitcherStateResponse``.
        """
        timestamp, login_resp = await self._login()
        if login_resp.successful:
            packet = packets.GET_STATE_PACKET_TYPE1.format(
                login_resp.session_id, timestamp, self._device_id
            )
            signed_packet = sign_packet_with_crc_key(packet)

            logger.debug("sending a get state packet")
            self._writer.write(unhexlify(signed_packet))
            state_resp = await self._reader.read(1024)
            try:
                response = SwitcherStateResponse(state_resp)
                if response.successful:
                    return response
            except (KeyError, ValueError) as ve:
                raise RuntimeError("get state request was not successful") from ve
        raise RuntimeError("login request was not successful")

    async def control_device(
        self, command: Command, minutes: int = 0
    ) -> SwitcherBaseResponse:
        """Use for sending the control packet to the device.

        Args:
            command: use the ``aioswitcher.api.Command`` enum.
            minutes: if turning-on optionally incorporate a timer.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        timestamp, login_resp = await self._login()
        timer = (
            minutes_to_hexadecimal_seconds(minutes)
            if minutes > 0
            else packets.NO_TIMER_REQUESTED
        )
        packet = packets.SEND_CONTROL_PACKET.format(
            login_resp.session_id,
            timestamp,
            self._device_id,
            command.value,
            timer,
        )
        signed_packet = sign_packet_with_crc_key(packet)

        logger.debug("sending a control packet")
        self._writer.write(unhexlify(signed_packet))
        response = await self._reader.read(1024)
        return SwitcherBaseResponse(response)

    async def set_auto_shutdown(self, full_time: timedelta) -> SwitcherBaseResponse:
        """Use for sending the set auto-off packet to the device.

        Args:
            full_time: timedelta value containing the configuration value for
                auto-shutdown.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        timestamp, login_resp = await self._login()
        auto_shutdown = timedelta_to_hexadecimal_seconds(full_time)
        packet = packets.SET_AUTO_OFF_SET_PACKET.format(
            login_resp.session_id,
            timestamp,
            self._device_id,
            auto_shutdown,
        )
        signed_packet = sign_packet_with_crc_key(packet)

        logger.debug("sending a set auto shutdown packet")
        self._writer.write(unhexlify(signed_packet))
        response = await self._reader.read(1024)
        return SwitcherBaseResponse(response)

    async def set_device_name(self, name: str) -> SwitcherBaseResponse:
        """Use for sending the set name packet to the device.

        Args:
            name: string name with the length of 2 >= x >= 32.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        timestamp, login_resp = await self._login()
        device_name = string_to_hexadecimale_device_name(name)
        packet = packets.UPDATE_DEVICE_NAME_PACKET.format(
            login_resp.session_id,
            timestamp,
            self._device_id,
            device_name,
        )
        signed_packet = sign_packet_with_crc_key(packet)

        logger.debug("sending a set name packet")
        self._writer.write(unhexlify(signed_packet))
        response = await self._reader.read(1024)
        return SwitcherBaseResponse(response)

    async def get_schedules(self) -> SwitcherGetSchedulesResponse:
        """Use for retrieval of the schedules from the device.

        Returns:
            An instance of ``SwitcherGetSchedulesResponse``.

        """
        timestamp, login_resp = await self._login()
        packet = packets.GET_SCHEDULES_PACKET.format(
            login_resp.session_id,
            timestamp,
            self._device_id,
        )
        signed_packet = sign_packet_with_crc_key(packet)

        logger.debug("sending a get schedules packet")
        self._writer.write(unhexlify(signed_packet))
        response = await self._reader.read(1024)
        return SwitcherGetSchedulesResponse(response)

    async def delete_schedule(self, schedule_id: str) -> SwitcherBaseResponse:
        """Use for deleting a schedule from the device.

        Use ``get_schedules`` to retrieve the schedule instance.

        Args:
            schedule_id: the identification of the schedule for deletion.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        timestamp, login_resp = await self._login()
        packet = packets.DELETE_SCHEDULE_PACKET.format(
            login_resp.session_id, timestamp, self._device_id, schedule_id
        )
        signed_packet = sign_packet_with_crc_key(packet)

        logger.debug("sending a delete schedule packet")
        self._writer.write(unhexlify(signed_packet))
        response = await self._reader.read(1024)
        return SwitcherBaseResponse(response)

    async def create_schedule(
        self, start_time: str, end_time: str, days: Set[Days] = set()
    ) -> SwitcherBaseResponse:
        """Use for creating a new schedule in the next empty schedule slot.

        Args:
            start_time: a string start time in %H:%M format. e.g. 13:00.
            end_time: a string start time in %H:%M format. e.g. 13:00.
            days: for recurring schedules, add ``Days``.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        timestamp, login_resp = await self._login()

        start_time_hex = time_to_hexadecimal_timestamp(start_time)
        end_time_hex = time_to_hexadecimal_timestamp(end_time)
        weekdays = (
            weekdays_to_hexadecimal(days)
            if len(days) > 0
            else packets.NON_RECURRING_SCHEDULE
        )
        new_schedule = packets.SCHEDULE_CREATE_DATA_FORMAT.format(
            weekdays, start_time_hex, end_time_hex
        )
        packet = packets.CREATE_SCHEDULE_PACKET.format(
            login_resp.session_id,
            timestamp,
            self._device_id,
            new_schedule,
        )
        signed_packet = sign_packet_with_crc_key(packet)

        logger.debug("sending a create schedule packet")
        self._writer.write(unhexlify(signed_packet))
        response = await self._reader.read(1024)
        return SwitcherBaseResponse(response)

__init__(ip_address, device_id, device_key) ⚓︎

Initialize the Switcher TCP connection API.

Source code in src/aioswitcher/api/__init__.py
355
356
357
def __init__(self, ip_address: str, device_id: str, device_key: str) -> None:
    """Initialize the Switcher TCP connection API."""
    super().__init__(ip_address, device_id, device_key, SWITCHER_TCP_PORT_TYPE1)

control_device(command, minutes=0) async ⚓︎

Use for sending the control packet to the device.

Parameters:

Name Type Description Default
command Command

use the aioswitcher.api.Command enum.

required
minutes int

if turning-on optionally incorporate a timer.

0

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
async def control_device(
    self, command: Command, minutes: int = 0
) -> SwitcherBaseResponse:
    """Use for sending the control packet to the device.

    Args:
        command: use the ``aioswitcher.api.Command`` enum.
        minutes: if turning-on optionally incorporate a timer.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    timestamp, login_resp = await self._login()
    timer = (
        minutes_to_hexadecimal_seconds(minutes)
        if minutes > 0
        else packets.NO_TIMER_REQUESTED
    )
    packet = packets.SEND_CONTROL_PACKET.format(
        login_resp.session_id,
        timestamp,
        self._device_id,
        command.value,
        timer,
    )
    signed_packet = sign_packet_with_crc_key(packet)

    logger.debug("sending a control packet")
    self._writer.write(unhexlify(signed_packet))
    response = await self._reader.read(1024)
    return SwitcherBaseResponse(response)

create_schedule(start_time, end_time, days=set()) async ⚓︎

Use for creating a new schedule in the next empty schedule slot.

Parameters:

Name Type Description Default
start_time str

a string start time in %H:%M format. e.g. 13:00.

required
end_time str

a string start time in %H:%M format. e.g. 13:00.

required
days Set[Days]

for recurring schedules, add Days.

set()

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
async def create_schedule(
    self, start_time: str, end_time: str, days: Set[Days] = set()
) -> SwitcherBaseResponse:
    """Use for creating a new schedule in the next empty schedule slot.

    Args:
        start_time: a string start time in %H:%M format. e.g. 13:00.
        end_time: a string start time in %H:%M format. e.g. 13:00.
        days: for recurring schedules, add ``Days``.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    timestamp, login_resp = await self._login()

    start_time_hex = time_to_hexadecimal_timestamp(start_time)
    end_time_hex = time_to_hexadecimal_timestamp(end_time)
    weekdays = (
        weekdays_to_hexadecimal(days)
        if len(days) > 0
        else packets.NON_RECURRING_SCHEDULE
    )
    new_schedule = packets.SCHEDULE_CREATE_DATA_FORMAT.format(
        weekdays, start_time_hex, end_time_hex
    )
    packet = packets.CREATE_SCHEDULE_PACKET.format(
        login_resp.session_id,
        timestamp,
        self._device_id,
        new_schedule,
    )
    signed_packet = sign_packet_with_crc_key(packet)

    logger.debug("sending a create schedule packet")
    self._writer.write(unhexlify(signed_packet))
    response = await self._reader.read(1024)
    return SwitcherBaseResponse(response)

delete_schedule(schedule_id) async ⚓︎

Use for deleting a schedule from the device.

Use get_schedules to retrieve the schedule instance.

Parameters:

Name Type Description Default
schedule_id str

the identification of the schedule for deletion.

required

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
async def delete_schedule(self, schedule_id: str) -> SwitcherBaseResponse:
    """Use for deleting a schedule from the device.

    Use ``get_schedules`` to retrieve the schedule instance.

    Args:
        schedule_id: the identification of the schedule for deletion.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    timestamp, login_resp = await self._login()
    packet = packets.DELETE_SCHEDULE_PACKET.format(
        login_resp.session_id, timestamp, self._device_id, schedule_id
    )
    signed_packet = sign_packet_with_crc_key(packet)

    logger.debug("sending a delete schedule packet")
    self._writer.write(unhexlify(signed_packet))
    response = await self._reader.read(1024)
    return SwitcherBaseResponse(response)

get_schedules() async ⚓︎

Use for retrieval of the schedules from the device.

Returns:

Type Description
SwitcherGetSchedulesResponse

An instance of SwitcherGetSchedulesResponse.

Source code in src/aioswitcher/api/__init__.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
async def get_schedules(self) -> SwitcherGetSchedulesResponse:
    """Use for retrieval of the schedules from the device.

    Returns:
        An instance of ``SwitcherGetSchedulesResponse``.

    """
    timestamp, login_resp = await self._login()
    packet = packets.GET_SCHEDULES_PACKET.format(
        login_resp.session_id,
        timestamp,
        self._device_id,
    )
    signed_packet = sign_packet_with_crc_key(packet)

    logger.debug("sending a get schedules packet")
    self._writer.write(unhexlify(signed_packet))
    response = await self._reader.read(1024)
    return SwitcherGetSchedulesResponse(response)

get_state() async ⚓︎

Use for sending the get state packet to the device.

Returns:

Type Description
SwitcherStateResponse

An instance of SwitcherStateResponse.

Source code in src/aioswitcher/api/__init__.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
async def get_state(self) -> SwitcherStateResponse:
    """Use for sending the get state packet to the device.

    Returns:
        An instance of ``SwitcherStateResponse``.
    """
    timestamp, login_resp = await self._login()
    if login_resp.successful:
        packet = packets.GET_STATE_PACKET_TYPE1.format(
            login_resp.session_id, timestamp, self._device_id
        )
        signed_packet = sign_packet_with_crc_key(packet)

        logger.debug("sending a get state packet")
        self._writer.write(unhexlify(signed_packet))
        state_resp = await self._reader.read(1024)
        try:
            response = SwitcherStateResponse(state_resp)
            if response.successful:
                return response
        except (KeyError, ValueError) as ve:
            raise RuntimeError("get state request was not successful") from ve
    raise RuntimeError("login request was not successful")

set_auto_shutdown(full_time) async ⚓︎

Use for sending the set auto-off packet to the device.

Parameters:

Name Type Description Default
full_time timedelta

timedelta value containing the configuration value for auto-shutdown.

required

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
async def set_auto_shutdown(self, full_time: timedelta) -> SwitcherBaseResponse:
    """Use for sending the set auto-off packet to the device.

    Args:
        full_time: timedelta value containing the configuration value for
            auto-shutdown.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    timestamp, login_resp = await self._login()
    auto_shutdown = timedelta_to_hexadecimal_seconds(full_time)
    packet = packets.SET_AUTO_OFF_SET_PACKET.format(
        login_resp.session_id,
        timestamp,
        self._device_id,
        auto_shutdown,
    )
    signed_packet = sign_packet_with_crc_key(packet)

    logger.debug("sending a set auto shutdown packet")
    self._writer.write(unhexlify(signed_packet))
    response = await self._reader.read(1024)
    return SwitcherBaseResponse(response)

set_device_name(name) async ⚓︎

Use for sending the set name packet to the device.

Parameters:

Name Type Description Default
name str

string name with the length of 2 >= x >= 32.

required

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
async def set_device_name(self, name: str) -> SwitcherBaseResponse:
    """Use for sending the set name packet to the device.

    Args:
        name: string name with the length of 2 >= x >= 32.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    timestamp, login_resp = await self._login()
    device_name = string_to_hexadecimale_device_name(name)
    packet = packets.UPDATE_DEVICE_NAME_PACKET.format(
        login_resp.session_id,
        timestamp,
        self._device_id,
        device_name,
    )
    signed_packet = sign_packet_with_crc_key(packet)

    logger.debug("sending a set name packet")
    self._writer.write(unhexlify(signed_packet))
    response = await self._reader.read(1024)
    return SwitcherBaseResponse(response)

SwitcherType2Api ⚓︎

Bases: SwitcherApi

Switcher Type2 devices (Breeze, Runners) TCP based API.

Parameters:

Name Type Description Default
ip_address str

the ip address assigned to the device.

required
device_id str

the id of the desired device.

required
device_key str

the login key of the device.

required
Source code in src/aioswitcher/api/__init__.py
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
@final
class SwitcherType2Api(SwitcherApi):
    """Switcher Type2 devices (Breeze, Runners) TCP based API.

    Args:
        ip_address: the ip address assigned to the device.
        device_id: the id of the desired device.
        device_key: the login key of the device.
    """

    def __init__(self, ip_address: str, device_id: str, device_key: str) -> None:
        """Initialize the Switcher TCP connection API."""
        super().__init__(ip_address, device_id, device_key, SWITCHER_TCP_PORT_TYPE2)

    async def control_breeze_device(
        self,
        remote: SwitcherBreezeRemote,
        state: Union[DeviceState, None] = None,
        mode: Union[ThermostatMode, None] = None,
        target_temp: int = 0,
        fan_level: Union[ThermostatFanLevel, None] = None,
        swing: Union[ThermostatSwing, None] = None,
        update_state: bool = False,
    ) -> SwitcherBaseResponse:
        """Use for sending the control packet to the Breeze device.

        Args:
            remote: the remote for the breeze device
            state: optionally the desired state of the device
            mode: optionally the desired mode of the device
            target_temp: optionally the target temperature
            fan_level: optionally the desired fan level
            swing: optionally the desired swing state
            update_state: update the device state without controlling the device

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        timestamp, login_resp = await self._login(DeviceType.BREEZE)
        if not login_resp.successful:
            logger.error("Failed to log into device id %s", self._device_id)
            raise RuntimeError("login request was not successful")

        logger.debug(
            "logged in session_id=%s, timestamp=%s", login_resp.session_id, timestamp
        )

        cmd_response: Union[SwitcherBaseResponse, None] = None
        if (
            state
            or mode
            or target_temp
            or fan_level
            or (swing and not remote._separated_swing_command)
        ):
            current_state = await self._get_breeze_state(timestamp, login_resp)
            if not current_state.successful:
                raise RuntimeError("get state request was not successful")

            logger.debug("got current breeze device state")

            state = state or current_state.state
            mode = mode or current_state.mode
            target_temp = target_temp or current_state.target_temperature
            fan_level = fan_level or current_state.fan_level
            set_swing = swing or current_state.swing
            if remote._separated_swing_command:
                set_swing = ThermostatSwing.OFF
            if update_state:
                packet = packets.BREEZE_UPDATE_STATUS_PACKET.format(
                    login_resp.session_id,
                    timestamp,
                    self._device_id,
                    state.value,
                    mode.value,
                    target_temp,
                    fan_level.value,
                    set_swing.value,
                )
                logger.debug("sending a set status packet")
            else:
                command = remote.build_command(
                    state, mode, target_temp, fan_level, set_swing, current_state.state
                )

                packet = packets.BREEZE_COMMAND_PACKET.format(
                    login_resp.session_id,
                    timestamp,
                    self._device_id,
                    command.length,
                    command.command,
                )
                logger.debug("sending a control packet")

            packet = set_message_length(packet)
            signed_packet = sign_packet_with_crc_key(packet)

            self._writer.write(unhexlify(signed_packet))
            response = await self._reader.read(1024)
            cmd_response = SwitcherBaseResponse(response)

            if not cmd_response.successful:
                raise RuntimeError("set state request was not successful")

        if remote._separated_swing_command and swing and not update_state:
            # if device is SPECIAL SWING device and user requested a swing change
            cmd_response = await self._control_breeze_swing_device(
                timestamp, login_resp.session_id, remote, swing
            )

        if cmd_response:
            return cmd_response
        raise RuntimeError("control breeze device failed")

    async def _control_breeze_swing_device(
        self,
        timestamp: str,
        session_id: str,
        remote: SwitcherBreezeRemote,
        swing: ThermostatSwing,
    ) -> SwitcherBaseResponse:
        """Use for sending the control packet to the Breeze device.

        Args:
            timestamp: the timestamp from the login response
            session_id: the session_id from the login response
            remote: the remote for the breeze device
            swing: the desired swing state

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        logger.debug("about to send Breeze special swing command")
        command = remote.build_swing_command(swing)
        packet = packets.BREEZE_COMMAND_PACKET.format(
            session_id,
            timestamp,
            self._device_id,
            command.length,
            command.command,
        )

        packet = set_message_length(packet)
        signed_packet = sign_packet_with_crc_key(packet)

        logger.debug("sending a control packet")

        self._writer.write(unhexlify(signed_packet))
        response = await self._reader.read(1024)
        return SwitcherBaseResponse(response)

    async def set_position(self, position: int = 0) -> SwitcherBaseResponse:
        """Use for setting the shutter position of the Runner and Runner Mini devices.

        Args:
            position: the position to set the device to, default to 0.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        hex_pos = "{0:0{1}x}".format(position, 2)

        logger.debug("about to send set position command")
        timestamp, login_resp = await self._login(DeviceType.RUNNER)
        if not login_resp.successful:
            logger.error("Failed to log into device with id %s", self._device_id)
            raise RuntimeError("login request was not successful")

        logger.debug(
            "logged in session_id=%s, timestamp=%s", login_resp.session_id, timestamp
        )

        packet = packets.RUNNER_SET_POSITION.format(
            login_resp.session_id, timestamp, self._device_id, hex_pos
        )

        packet = set_message_length(packet)
        signed_packet = sign_packet_with_crc_key(packet)

        logger.debug("sending a control packet")

        self._writer.write(unhexlify(signed_packet))
        response = await self._reader.read(1024)
        return SwitcherBaseResponse(response)

    async def get_breeze_state(self) -> SwitcherThermostatStateResponse:
        """Use for sending the get state packet to the Breeze device.

        Returns:
            An instance of ``SwitcherThermostatStateResponse``.

        """
        timestamp, login_resp = await self._login(DeviceType.BREEZE)
        if login_resp.successful:
            return await self._get_breeze_state(timestamp, login_resp)
        raise RuntimeError("login request was not successful")

    async def _get_breeze_state(
        self, timestamp: str, login_resp: SwitcherLoginResponse
    ) -> SwitcherThermostatStateResponse:
        packet = packets.GET_STATE_PACKET2_TYPE2.format(
            login_resp.session_id, timestamp, self._device_id
        )

        signed_packet = sign_packet_with_crc_key(packet)

        logger.debug("sending a get state packet")
        self._writer.write(unhexlify(signed_packet))
        state_resp = await self._reader.read(1024)
        try:
            response = SwitcherThermostatStateResponse(state_resp)
            return response
        except (KeyError, ValueError) as ve:
            raise RuntimeError("get breeze state request was not successful") from ve

    async def get_shutter_state(self) -> SwitcherShutterStateResponse:
        """Use for sending the get state packet to the Runner device.

        Returns:
            An instance of ``SwitcherShutterStateResponse``.

        """
        timestamp, login_resp = await self._login(DeviceType.RUNNER)
        if login_resp.successful:
            packet = packets.GET_STATE_PACKET2_TYPE2.format(
                login_resp.session_id, timestamp, self._device_id
            )

            signed_packet = sign_packet_with_crc_key(packet)

            logger.debug("sending a get state packet")
            self._writer.write(unhexlify(signed_packet))
            state_resp = await self._reader.read(1024)
            try:
                response = SwitcherShutterStateResponse(state_resp)
                return response
            except (KeyError, ValueError) as ve:
                raise RuntimeError(
                    "get shutter state request was not successful"
                ) from ve
        raise RuntimeError("login request was not successful")

__init__(ip_address, device_id, device_key) ⚓︎

Initialize the Switcher TCP connection API.

Source code in src/aioswitcher/api/__init__.py
560
561
562
def __init__(self, ip_address: str, device_id: str, device_key: str) -> None:
    """Initialize the Switcher TCP connection API."""
    super().__init__(ip_address, device_id, device_key, SWITCHER_TCP_PORT_TYPE2)

control_breeze_device(remote, state=None, mode=None, target_temp=0, fan_level=None, swing=None, update_state=False) async ⚓︎

Use for sending the control packet to the Breeze device.

Parameters:

Name Type Description Default
remote SwitcherBreezeRemote

the remote for the breeze device

required
state Union[DeviceState, None]

optionally the desired state of the device

None
mode Union[ThermostatMode, None]

optionally the desired mode of the device

None
target_temp int

optionally the target temperature

0
fan_level Union[ThermostatFanLevel, None]

optionally the desired fan level

None
swing Union[ThermostatSwing, None]

optionally the desired swing state

None
update_state bool

update the device state without controlling the device

False

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
async def control_breeze_device(
    self,
    remote: SwitcherBreezeRemote,
    state: Union[DeviceState, None] = None,
    mode: Union[ThermostatMode, None] = None,
    target_temp: int = 0,
    fan_level: Union[ThermostatFanLevel, None] = None,
    swing: Union[ThermostatSwing, None] = None,
    update_state: bool = False,
) -> SwitcherBaseResponse:
    """Use for sending the control packet to the Breeze device.

    Args:
        remote: the remote for the breeze device
        state: optionally the desired state of the device
        mode: optionally the desired mode of the device
        target_temp: optionally the target temperature
        fan_level: optionally the desired fan level
        swing: optionally the desired swing state
        update_state: update the device state without controlling the device

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    timestamp, login_resp = await self._login(DeviceType.BREEZE)
    if not login_resp.successful:
        logger.error("Failed to log into device id %s", self._device_id)
        raise RuntimeError("login request was not successful")

    logger.debug(
        "logged in session_id=%s, timestamp=%s", login_resp.session_id, timestamp
    )

    cmd_response: Union[SwitcherBaseResponse, None] = None
    if (
        state
        or mode
        or target_temp
        or fan_level
        or (swing and not remote._separated_swing_command)
    ):
        current_state = await self._get_breeze_state(timestamp, login_resp)
        if not current_state.successful:
            raise RuntimeError("get state request was not successful")

        logger.debug("got current breeze device state")

        state = state or current_state.state
        mode = mode or current_state.mode
        target_temp = target_temp or current_state.target_temperature
        fan_level = fan_level or current_state.fan_level
        set_swing = swing or current_state.swing
        if remote._separated_swing_command:
            set_swing = ThermostatSwing.OFF
        if update_state:
            packet = packets.BREEZE_UPDATE_STATUS_PACKET.format(
                login_resp.session_id,
                timestamp,
                self._device_id,
                state.value,
                mode.value,
                target_temp,
                fan_level.value,
                set_swing.value,
            )
            logger.debug("sending a set status packet")
        else:
            command = remote.build_command(
                state, mode, target_temp, fan_level, set_swing, current_state.state
            )

            packet = packets.BREEZE_COMMAND_PACKET.format(
                login_resp.session_id,
                timestamp,
                self._device_id,
                command.length,
                command.command,
            )
            logger.debug("sending a control packet")

        packet = set_message_length(packet)
        signed_packet = sign_packet_with_crc_key(packet)

        self._writer.write(unhexlify(signed_packet))
        response = await self._reader.read(1024)
        cmd_response = SwitcherBaseResponse(response)

        if not cmd_response.successful:
            raise RuntimeError("set state request was not successful")

    if remote._separated_swing_command and swing and not update_state:
        # if device is SPECIAL SWING device and user requested a swing change
        cmd_response = await self._control_breeze_swing_device(
            timestamp, login_resp.session_id, remote, swing
        )

    if cmd_response:
        return cmd_response
    raise RuntimeError("control breeze device failed")

get_breeze_state() async ⚓︎

Use for sending the get state packet to the Breeze device.

Returns:

Type Description
SwitcherThermostatStateResponse

An instance of SwitcherThermostatStateResponse.

Source code in src/aioswitcher/api/__init__.py
738
739
740
741
742
743
744
745
746
747
748
async def get_breeze_state(self) -> SwitcherThermostatStateResponse:
    """Use for sending the get state packet to the Breeze device.

    Returns:
        An instance of ``SwitcherThermostatStateResponse``.

    """
    timestamp, login_resp = await self._login(DeviceType.BREEZE)
    if login_resp.successful:
        return await self._get_breeze_state(timestamp, login_resp)
    raise RuntimeError("login request was not successful")

get_shutter_state() async ⚓︎

Use for sending the get state packet to the Runner device.

Returns:

Type Description
SwitcherShutterStateResponse

An instance of SwitcherShutterStateResponse.

Source code in src/aioswitcher/api/__init__.py
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
async def get_shutter_state(self) -> SwitcherShutterStateResponse:
    """Use for sending the get state packet to the Runner device.

    Returns:
        An instance of ``SwitcherShutterStateResponse``.

    """
    timestamp, login_resp = await self._login(DeviceType.RUNNER)
    if login_resp.successful:
        packet = packets.GET_STATE_PACKET2_TYPE2.format(
            login_resp.session_id, timestamp, self._device_id
        )

        signed_packet = sign_packet_with_crc_key(packet)

        logger.debug("sending a get state packet")
        self._writer.write(unhexlify(signed_packet))
        state_resp = await self._reader.read(1024)
        try:
            response = SwitcherShutterStateResponse(state_resp)
            return response
        except (KeyError, ValueError) as ve:
            raise RuntimeError(
                "get shutter state request was not successful"
            ) from ve
    raise RuntimeError("login request was not successful")

set_position(position=0) async ⚓︎

Use for setting the shutter position of the Runner and Runner Mini devices.

Parameters:

Name Type Description Default
position int

the position to set the device to, default to 0.

0

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
async def set_position(self, position: int = 0) -> SwitcherBaseResponse:
    """Use for setting the shutter position of the Runner and Runner Mini devices.

    Args:
        position: the position to set the device to, default to 0.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    hex_pos = "{0:0{1}x}".format(position, 2)

    logger.debug("about to send set position command")
    timestamp, login_resp = await self._login(DeviceType.RUNNER)
    if not login_resp.successful:
        logger.error("Failed to log into device with id %s", self._device_id)
        raise RuntimeError("login request was not successful")

    logger.debug(
        "logged in session_id=%s, timestamp=%s", login_resp.session_id, timestamp
    )

    packet = packets.RUNNER_SET_POSITION.format(
        login_resp.session_id, timestamp, self._device_id, hex_pos
    )

    packet = set_message_length(packet)
    signed_packet = sign_packet_with_crc_key(packet)

    logger.debug("sending a control packet")

    self._writer.write(unhexlify(signed_packet))
    response = await self._reader.read(1024)
    return SwitcherBaseResponse(response)

Switcher integration TCP socket API messages.

StateMessageParser dataclass ⚓︎

Use for parsing api messages.

Source code in src/aioswitcher/api/messages.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
@final
@dataclass
class StateMessageParser:
    """Use for parsing api messages."""

    response: InitVar[bytes]

    def __post_init__(self, response: bytes) -> None:
        """Post initialization of the parser."""
        self._hex_response = hexlify(response)

    def get_power_consumption(self) -> int:
        """Return the current power consumption of the device."""
        hex_power = self._hex_response[154:162]
        return int(hex_power[2:4] + hex_power[0:2], 16)

    def get_time_left(self) -> str:
        """Return the time left for the device current run."""
        hex_time_left = self._hex_response[178:186]
        time_left_seconds = int(
            hex_time_left[6:8]
            + hex_time_left[4:6]
            + hex_time_left[2:4]
            + hex_time_left[0:2],
            16,
        )
        return seconds_to_iso_time(time_left_seconds)

    def get_time_on(self) -> str:
        """Return how long the device has been on."""
        hex_time_on = self._hex_response[186:194]
        time_on_seconds = int(
            hex_time_on[6:8] + hex_time_on[4:6] + hex_time_on[2:4] + hex_time_on[0:2],
            16,
        )
        return seconds_to_iso_time(time_on_seconds)

    def get_auto_shutdown(self) -> str:
        """Return the value of the auto shutdown configuration."""
        hex_auto_off = self._hex_response[194:202]
        auto_off_seconds = int(
            hex_auto_off[6:8]
            + hex_auto_off[4:6]
            + hex_auto_off[2:4]
            + hex_auto_off[0:2],
            16,
        )
        return seconds_to_iso_time(auto_off_seconds)

    def get_state(self) -> DeviceState:
        """Return the current device state."""
        hex_state = self._hex_response[150:152].decode()
        states = dict(map(lambda s: (s.value, s), DeviceState))
        return states[hex_state]

    def get_thermostat_state(self) -> DeviceState:
        """Return the current thermostat state."""
        hex_power = self._hex_response[156:158].decode()
        return DeviceState.OFF if hex_power == DeviceState.OFF.value else DeviceState.ON

    def get_thermostat_mode(self) -> ThermostatMode:
        """Return the current thermostat mode."""
        hex_mode = self._hex_response[158:160]
        modes = dict(map(lambda s: (s.value, s), ThermostatMode))
        try:
            return modes[hex_mode.decode()]
        except KeyError:
            return ThermostatMode.COOL

    def get_thermostat_temp(self) -> float:
        """Return the current temp of the thermostat."""
        return int(self._hex_response[154:156] + self._hex_response[152:154], 16) / 10

    def get_thermostat_target_temp(self) -> int:
        """Return the current temperature of the thermostat."""
        hex_temp = self._hex_response[160:162]
        return int(hex_temp, 16)

    def get_thermostat_fan_level(self) -> ThermostatFanLevel:
        """Return the current thermostat fan level."""
        hex_level = self._hex_response[162:163].decode()
        levels = dict(map(lambda s: (s.value, s), ThermostatFanLevel))
        try:
            return levels[hex_level]
        except KeyError:
            return ThermostatFanLevel.LOW

    def get_thermostat_swing(self) -> ThermostatSwing:
        """Return the current thermostat fan swing."""
        hex_swing = self._hex_response[163:164].decode()
        return (
            ThermostatSwing.OFF
            if hex_swing == ThermostatSwing.OFF.value
            else ThermostatSwing.ON
        )

    def get_thermostat_remote_id(self) -> str:
        """Return the current thermostat remote."""
        remote_hex = unhexlify(self._hex_response)
        return remote_hex[84:92].decode().rstrip("\x00")

    def get_shutter_position(self) -> int:
        """Return the current shutter position."""
        hex_pos = self._hex_response[152:154].decode()
        return int(hex_pos, 16)

    def get_shutter_direction(self) -> ShutterDirection:
        """Return the current shutter direction."""
        hex_dir = self._hex_response[156:160].decode()
        directions = dict(map(lambda s: (s.value, s), ShutterDirection))
        return directions[hex_dir]

__post_init__(response) ⚓︎

Post initialization of the parser.

Source code in src/aioswitcher/api/messages.py
39
40
41
def __post_init__(self, response: bytes) -> None:
    """Post initialization of the parser."""
    self._hex_response = hexlify(response)

get_auto_shutdown() ⚓︎

Return the value of the auto shutdown configuration.

Source code in src/aioswitcher/api/messages.py
69
70
71
72
73
74
75
76
77
78
79
def get_auto_shutdown(self) -> str:
    """Return the value of the auto shutdown configuration."""
    hex_auto_off = self._hex_response[194:202]
    auto_off_seconds = int(
        hex_auto_off[6:8]
        + hex_auto_off[4:6]
        + hex_auto_off[2:4]
        + hex_auto_off[0:2],
        16,
    )
    return seconds_to_iso_time(auto_off_seconds)

get_power_consumption() ⚓︎

Return the current power consumption of the device.

Source code in src/aioswitcher/api/messages.py
43
44
45
46
def get_power_consumption(self) -> int:
    """Return the current power consumption of the device."""
    hex_power = self._hex_response[154:162]
    return int(hex_power[2:4] + hex_power[0:2], 16)

get_shutter_direction() ⚓︎

Return the current shutter direction.

Source code in src/aioswitcher/api/messages.py
138
139
140
141
142
def get_shutter_direction(self) -> ShutterDirection:
    """Return the current shutter direction."""
    hex_dir = self._hex_response[156:160].decode()
    directions = dict(map(lambda s: (s.value, s), ShutterDirection))
    return directions[hex_dir]

get_shutter_position() ⚓︎

Return the current shutter position.

Source code in src/aioswitcher/api/messages.py
133
134
135
136
def get_shutter_position(self) -> int:
    """Return the current shutter position."""
    hex_pos = self._hex_response[152:154].decode()
    return int(hex_pos, 16)

get_state() ⚓︎

Return the current device state.

Source code in src/aioswitcher/api/messages.py
81
82
83
84
85
def get_state(self) -> DeviceState:
    """Return the current device state."""
    hex_state = self._hex_response[150:152].decode()
    states = dict(map(lambda s: (s.value, s), DeviceState))
    return states[hex_state]

get_thermostat_fan_level() ⚓︎

Return the current thermostat fan level.

Source code in src/aioswitcher/api/messages.py
110
111
112
113
114
115
116
117
def get_thermostat_fan_level(self) -> ThermostatFanLevel:
    """Return the current thermostat fan level."""
    hex_level = self._hex_response[162:163].decode()
    levels = dict(map(lambda s: (s.value, s), ThermostatFanLevel))
    try:
        return levels[hex_level]
    except KeyError:
        return ThermostatFanLevel.LOW

get_thermostat_mode() ⚓︎

Return the current thermostat mode.

Source code in src/aioswitcher/api/messages.py
92
93
94
95
96
97
98
99
def get_thermostat_mode(self) -> ThermostatMode:
    """Return the current thermostat mode."""
    hex_mode = self._hex_response[158:160]
    modes = dict(map(lambda s: (s.value, s), ThermostatMode))
    try:
        return modes[hex_mode.decode()]
    except KeyError:
        return ThermostatMode.COOL

get_thermostat_remote_id() ⚓︎

Return the current thermostat remote.

Source code in src/aioswitcher/api/messages.py
128
129
130
131
def get_thermostat_remote_id(self) -> str:
    """Return the current thermostat remote."""
    remote_hex = unhexlify(self._hex_response)
    return remote_hex[84:92].decode().rstrip("\x00")

get_thermostat_state() ⚓︎

Return the current thermostat state.

Source code in src/aioswitcher/api/messages.py
87
88
89
90
def get_thermostat_state(self) -> DeviceState:
    """Return the current thermostat state."""
    hex_power = self._hex_response[156:158].decode()
    return DeviceState.OFF if hex_power == DeviceState.OFF.value else DeviceState.ON

get_thermostat_swing() ⚓︎

Return the current thermostat fan swing.

Source code in src/aioswitcher/api/messages.py
119
120
121
122
123
124
125
126
def get_thermostat_swing(self) -> ThermostatSwing:
    """Return the current thermostat fan swing."""
    hex_swing = self._hex_response[163:164].decode()
    return (
        ThermostatSwing.OFF
        if hex_swing == ThermostatSwing.OFF.value
        else ThermostatSwing.ON
    )

get_thermostat_target_temp() ⚓︎

Return the current temperature of the thermostat.

Source code in src/aioswitcher/api/messages.py
105
106
107
108
def get_thermostat_target_temp(self) -> int:
    """Return the current temperature of the thermostat."""
    hex_temp = self._hex_response[160:162]
    return int(hex_temp, 16)

get_thermostat_temp() ⚓︎

Return the current temp of the thermostat.

Source code in src/aioswitcher/api/messages.py
101
102
103
def get_thermostat_temp(self) -> float:
    """Return the current temp of the thermostat."""
    return int(self._hex_response[154:156] + self._hex_response[152:154], 16) / 10

get_time_left() ⚓︎

Return the time left for the device current run.

Source code in src/aioswitcher/api/messages.py
48
49
50
51
52
53
54
55
56
57
58
def get_time_left(self) -> str:
    """Return the time left for the device current run."""
    hex_time_left = self._hex_response[178:186]
    time_left_seconds = int(
        hex_time_left[6:8]
        + hex_time_left[4:6]
        + hex_time_left[2:4]
        + hex_time_left[0:2],
        16,
    )
    return seconds_to_iso_time(time_left_seconds)

get_time_on() ⚓︎

Return how long the device has been on.

Source code in src/aioswitcher/api/messages.py
60
61
62
63
64
65
66
67
def get_time_on(self) -> str:
    """Return how long the device has been on."""
    hex_time_on = self._hex_response[186:194]
    time_on_seconds = int(
        hex_time_on[6:8] + hex_time_on[4:6] + hex_time_on[2:4] + hex_time_on[0:2],
        16,
    )
    return seconds_to_iso_time(time_on_seconds)

SwitcherBaseResponse dataclass ⚓︎

Representation of the switcher base response message.

Applicable for all messages that do no require post initialization. e.g. not applicable for SwitcherLoginResponse, SwitcherStateResponse, SwitcherGetScheduleResponse.

Parameters:

Name Type Description Default
unparsed_response bytes

the raw response from the device.

required
Source code in src/aioswitcher/api/messages.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
@dataclass
class SwitcherBaseResponse:
    """Representation of the switcher base response message.

    Applicable for all messages that do no require post initialization.
    e.g. not applicable for SwitcherLoginResponse, SwitcherStateResponse,
    SwitcherGetScheduleResponse.

    Args:
        unparsed_response: the raw response from the device.

    """

    unparsed_response: bytes

    @property
    def successful(self) -> bool:
        """Return true if the response is not empty.

        Partially indicating the request was successful.
        """
        return self.unparsed_response is not None and len(self.unparsed_response) > 0

successful: bool property ⚓︎

Return true if the response is not empty.

Partially indicating the request was successful.

SwitcherGetSchedulesResponse dataclass ⚓︎

Bases: SwitcherBaseResponse

Representation of the switcher get schedule message.

Source code in src/aioswitcher/api/messages.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
@final
@dataclass
class SwitcherGetSchedulesResponse(SwitcherBaseResponse):
    """Representation of the switcher get schedule message."""

    schedules: Set[SwitcherSchedule] = field(init=False)

    def __post_init__(self) -> None:
        """Post initialization of the message."""
        self.schedules = get_schedules(self.unparsed_response)

    @property
    def found_schedules(self) -> bool:
        """Return true if found schedules in the response."""
        return len(self.schedules) > 0

found_schedules: bool property ⚓︎

Return true if found schedules in the response.

__post_init__() ⚓︎

Post initialization of the message.

Source code in src/aioswitcher/api/messages.py
215
216
217
def __post_init__(self) -> None:
    """Post initialization of the message."""
    self.schedules = get_schedules(self.unparsed_response)

SwitcherLoginResponse dataclass ⚓︎

Bases: SwitcherBaseResponse

Representations of the switcher login response message.

Source code in src/aioswitcher/api/messages.py
169
170
171
172
173
174
175
176
177
178
179
180
181
@final
@dataclass
class SwitcherLoginResponse(SwitcherBaseResponse):
    """Representations of the switcher login response message."""

    session_id: str = field(init=False)

    def __post_init__(self) -> None:
        """Post initialization of the response."""
        try:
            self.session_id = hexlify(self.unparsed_response)[16:24].decode()
        except Exception as exc:
            raise ValueError("failed to parse login response message") from exc

__post_init__() ⚓︎

Post initialization of the response.

Source code in src/aioswitcher/api/messages.py
176
177
178
179
180
181
def __post_init__(self) -> None:
    """Post initialization of the response."""
    try:
        self.session_id = hexlify(self.unparsed_response)[16:24].decode()
    except Exception as exc:
        raise ValueError("failed to parse login response message") from exc

SwitcherShutterStateResponse dataclass ⚓︎

Bases: SwitcherBaseResponse

Representation of the Switcher shutter devices state response message.

Source code in src/aioswitcher/api/messages.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
@final
@dataclass
class SwitcherShutterStateResponse(SwitcherBaseResponse):
    """Representation of the Switcher shutter devices state response message."""

    position: int = field(init=False)
    direction: ShutterDirection = field(init=False)

    def __post_init__(self) -> None:
        """Post initialization of the message."""
        parser = StateMessageParser(self.unparsed_response)

        self.direction = parser.get_shutter_direction()
        self.position = parser.get_shutter_position()

__post_init__() ⚓︎

Post initialization of the message.

Source code in src/aioswitcher/api/messages.py
259
260
261
262
263
264
def __post_init__(self) -> None:
    """Post initialization of the message."""
    parser = StateMessageParser(self.unparsed_response)

    self.direction = parser.get_shutter_direction()
    self.position = parser.get_shutter_position()

SwitcherStateResponse dataclass ⚓︎

Bases: SwitcherBaseResponse

Representation of the switcher state response message.

Source code in src/aioswitcher/api/messages.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
@final
@dataclass
class SwitcherStateResponse(SwitcherBaseResponse):
    """Representation of the switcher state response message."""

    state: DeviceState = field(init=False)
    time_left: str = field(init=False)
    time_on: str = field(init=False)
    auto_shutdown: str = field(init=False)
    power_consumption: int = field(init=False)
    electric_current: float = field(init=False)

    def __post_init__(self) -> None:
        """Post initialization of the message."""
        parser = StateMessageParser(self.unparsed_response)

        self.state = parser.get_state()
        self.time_left = parser.get_time_left()
        self.time_on = parser.get_time_on()
        self.auto_shutdown = parser.get_auto_shutdown()
        self.power_consumption = parser.get_power_consumption()
        self.electric_current = watts_to_amps(self.power_consumption)

__post_init__() ⚓︎

Post initialization of the message.

Source code in src/aioswitcher/api/messages.py
196
197
198
199
200
201
202
203
204
205
def __post_init__(self) -> None:
    """Post initialization of the message."""
    parser = StateMessageParser(self.unparsed_response)

    self.state = parser.get_state()
    self.time_left = parser.get_time_left()
    self.time_on = parser.get_time_on()
    self.auto_shutdown = parser.get_auto_shutdown()
    self.power_consumption = parser.get_power_consumption()
    self.electric_current = watts_to_amps(self.power_consumption)

SwitcherThermostatStateResponse dataclass ⚓︎

Bases: SwitcherBaseResponse

Representation of the Switcher thermostat device state response message.

Source code in src/aioswitcher/api/messages.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
@final
@dataclass
class SwitcherThermostatStateResponse(SwitcherBaseResponse):
    """Representation of the Switcher thermostat device state response message."""

    state: DeviceState = field(init=False)
    mode: ThermostatMode = field(init=False)
    fan_level: ThermostatFanLevel = field(init=False)
    temperature: float = field(init=False)
    target_temperature: int = field(init=False)
    swing: ThermostatSwing = field(init=False)
    remote_id: str = field(init=False)

    def __post_init__(self) -> None:
        """Post initialization of the message."""
        parser = StateMessageParser(self.unparsed_response)

        self.state = parser.get_thermostat_state()
        self.mode = parser.get_thermostat_mode()
        self.fan_level = parser.get_thermostat_fan_level()
        self.temperature = parser.get_thermostat_temp()
        self.target_temperature = parser.get_thermostat_target_temp()
        self.swing = parser.get_thermostat_swing()
        self.remote_id = parser.get_thermostat_remote_id()

__post_init__() ⚓︎

Post initialization of the message.

Source code in src/aioswitcher/api/messages.py
238
239
240
241
242
243
244
245
246
247
248
def __post_init__(self) -> None:
    """Post initialization of the message."""
    parser = StateMessageParser(self.unparsed_response)

    self.state = parser.get_thermostat_state()
    self.mode = parser.get_thermostat_mode()
    self.fan_level = parser.get_thermostat_fan_level()
    self.temperature = parser.get_thermostat_temp()
    self.target_temperature = parser.get_thermostat_target_temp()
    self.swing = parser.get_thermostat_swing()
    self.remote_id = parser.get_thermostat_remote_id()

Switcher integration API remote related classes and functions.

SwitcherBreezeCommand ⚓︎

Representations of the Switcher Breeze command message.

Parameters:

Name Type Description Default
command str

a string command ready to be parsed and sent

required
Source code in src/aioswitcher/api/remotes.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@final
class SwitcherBreezeCommand:
    """Representations of the Switcher Breeze command message.

    Args:
        command: a string command ready to be parsed and sent

    """

    def __init__(self, command: str) -> None:
        """Initialize the Breeze command."""
        self.command = command
        self.length = self._get_command_length()

    def _get_command_length(self) -> str:
        """Get command length.

        Note:
            This is a private function used by other functions, do not call this
            function directly.

        """
        return "{:x}".format(int(len(self.command) / 2)).ljust(4, "0")

__init__(command) ⚓︎

Initialize the Breeze command.

Source code in src/aioswitcher/api/remotes.py
81
82
83
84
def __init__(self, command: str) -> None:
    """Initialize the Breeze command."""
    self.command = command
    self.length = self._get_command_length()

SwitcherBreezeRemote ⚓︎

Class that represent a remote for a Breeze device/s.

Parameters:

Name Type Description Default
ir_set Dict[str, Any]

a dictionary for all supported remotes

required
Source code in src/aioswitcher/api/remotes.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
@final
class SwitcherBreezeRemote:
    """Class that represent a remote for a Breeze device/s.

    Args:
        ir_set: a dictionary for all supported remotes

    """

    def __init__(self, ir_set: Dict[str, Any]) -> None:
        """Initialize the remote by parsing the ir_set data."""
        self._min_temp = 100  # ridiculously high number
        self._max_temp = -100  # ridiculously low number
        self._on_off_type = False
        self._remote_id: str = ir_set["IRSetID"]
        # _ir_wave_map hosts a shrunk version of the ir_set file which ignores
        # unused data and map key to dict{"HexCode": str, "Para": str}
        # this is being built by the _resolve_capabilities method
        self._ir_wave_map: Dict[str, Dict[str, str]] = {}
        self._modes_features: Dict[ThermostatMode, Dict[str, Any]] = {}
        """
        self._modes_features basically explains the available features
            (Swing/Fan levels/ temp control of each mode)
        Example of _modes_features for ELEC7022 IRSet remote
        {
            < ThermostatMode.AUTO: ('01', 'auto') >: {
                'swing': False,
                'fan_levels': set(),
                'temperature_control': False
            }, < ThermostatMode.DRY: ('02', 'dry') >: {
                'swing': False,
                'fan_levels': set(),
                'temperature_control': False
            }, < ThermostatMode.FAN: ('03', 'fan') >: {
                'swing': False,
                'fan_levels': {
                    < ThermostatFanLevel.HIGH: ('3', 'high') > ,
                    < ThermostatFanLevel.AUTO: ('0', 'auto') > ,
                    < ThermostatFanLevel.MEDIUM: ('2', 'medium') > ,
                    < ThermostatFanLevel.LOW: ('1', 'low') >
                },
                'temperature_control': False
            }, < ThermostatMode.COOL: ('04', 'cool') >: {
                'swing': False,
                'fan_levels': {
                    < ThermostatFanLevel.HIGH: ('3', 'high') > ,
                    < ThermostatFanLevel.AUTO: ('0', 'auto') > ,
                    < ThermostatFanLevel.MEDIUM: ('2', 'medium') > ,
                    < ThermostatFanLevel.LOW: ('1', 'low') >
                },
                'temperature_control': True
            }, < ThermostatMode.HEAT: ('05', 'heat') >: {
                'swing': True,
                'fan_levels': {
                    < ThermostatFanLevel.HIGH: ('3', 'high') > ,
                    < ThermostatFanLevel.AUTO: ('0', 'auto') > ,
                    < ThermostatFanLevel.MEDIUM: ('2', 'medium') > ,
                    < ThermostatFanLevel.LOW: ('1', 'low') >
                },
                'temperature_control': True
            }
        }
        """
        self._separated_swing_command = (
            self._remote_id in SPECIAL_SWING_COMMAND_REMOTE_IDS
        )

        self._resolve_capabilities(ir_set)

    @property
    def modes_features(
        self,
    ) -> Dict[ThermostatMode, Dict[str, Any]]:
        """Getter for supported feature per mode."""
        return self._modes_features

    @property
    def supported_modes(self) -> List[ThermostatMode]:
        """Getter for supported modes."""
        return list(self.modes_features.keys())

    @property
    def max_temperature(self) -> int:
        """Getter for Maximum supported temperature."""
        return self._max_temp

    @property
    def min_temperature(self) -> int:
        """Getter for Minimum supported temperature."""
        return self._min_temp

    @property
    def remote_id(self) -> str:
        """Getter for remote id."""
        return self._remote_id

    @property
    def separated_swing_command(self) -> bool:
        """Getter for which indicates if the AC has a separated swing command."""
        return self._separated_swing_command

    @property
    def on_off_type(self) -> bool:
        """Getter for which indicates if the AC if on/off (toggle) type."""
        return self._on_off_type

    def _lookup_key_in_irset(self, key: List[str]) -> None:
        """Use this to look for a key in the IRSet file.

        Args:
            key: a reference to List of strings representing parts of the command.

        Note:
            This is a private function used by other functions, do not call this
            function directly.

        """
        while (
            len(key) != 1
        ):  # we match this condition with the key contains at least the mode
            # Try to lookup the key as is in the ir set map
            if "".join(key) not in self._ir_wave_map:
                # we didn't find a key, remove feature from the key and try to
                # look again.
                # The first feature removed is the swing "_d1"
                # Secondly is the fan level (_f0, _f1, _f2, _f3)
                # lastly we stay at least with the mode part
                removed_element = key.pop()
                logger.debug(f"Removed {removed_element} from the key")
            else:
                # found a match, with modified list
                return

    def build_swing_command(self, swing: ThermostatSwing) -> SwitcherBreezeCommand:
        """Build a special command to control swing on special remotes.

        Args:
            swing: the desired swing state

        Returns:
            An instance of ``SwitcherBreezeCommand``

        """
        key = "FUN_d0" if swing == ThermostatSwing.OFF else "FUN_d1"
        try:
            command = (
                self._ir_wave_map["".join(key)]["Para"]
                + "|"
                + self._ir_wave_map["".join(key)]["HexCode"]
            )
        except KeyError:
            logger.error(
                f'The special swing key "{key}"        \
                    does not exist in the IRSet database!'
            )
            raise RuntimeError(
                f'The special swing key "{key}"'
                " does not exist in the IRSet database!"
            )

        return SwitcherBreezeCommand(
            "00000000" + hexlify(str(command).encode()).decode()
        )

    def build_command(
        self,
        state: DeviceState,
        mode: ThermostatMode,
        target_temp: int,
        fan_level: ThermostatFanLevel,
        swing: ThermostatSwing,
        current_state: Union[DeviceState, None] = None,
    ) -> SwitcherBreezeCommand:
        """Build command that controls the Breeze device.

        Args:
            state: the desired state of the device
            mode: the desired mode of the device
            target_temp: the target temperature
            fan_level: the desired fan level
            swing: the desired swing state
            current_state: optionally, for toggle device, pass previous state to avoid
                redundant requests

        Returns:
            An instance of ``SwitcherBreezeCommand``

        """
        key: List[str] = []
        command = ""
        # verify the target temp and set maximum if we provided with higher number
        if target_temp > self._max_temp:
            target_temp = self._max_temp

        # verify the target temp and set minimum if we provided with lower number
        elif target_temp < self._min_temp:
            target_temp = self._min_temp

        if mode not in self.supported_modes:
            raise RuntimeError(
                f'Invalid mode "{mode.display}", available modes for this device are: '
                f"{', '.join([x.display for x in self.supported_modes])}"
            )

        # non toggle AC, just turn it off
        if not self._on_off_type and state == DeviceState.OFF:
            key.append("off")
        else:
            # This is a toggle mode AC, we determine here whether the first bit should
            # be on or off in order to change the AC state based on its current state.
            if self._on_off_type and current_state and current_state != state:
                # This is a toggle mode AC.
                key.append("on_")

            # for toggle mode AC - set state. for non toggle AC mode set state and turn
            # it on.
            if self._on_off_type or (not self._on_off_type and state == DeviceState.ON):
                # Auto and Dry can sometimes have a FAN level and in other cases
                # it might not have. in any case we try to add the request fan
                # level to the key, if we get a match we fulfill the request, otherwise
                # we remove the fan and lookup the key again
                if mode in [
                    ThermostatMode.AUTO,
                    ThermostatMode.DRY,
                    ThermostatMode.FAN,
                ]:
                    # the command key should start with mode (aa/ad/ar/ah)
                    key.append(MODE_TO_COMMAND[mode])
                    # add the requested fan level (_f0, _f1, _f2, _f3)
                    key.append("_" + FAN_LEVEL_TO_COMMAND[fan_level])

                    # add the swing On (_d1) to the key
                    if swing == ThermostatSwing.ON:
                        key.append("_d1")

                    self._lookup_key_in_irset(key)

                if mode in [ThermostatMode.COOL, ThermostatMode.HEAT]:
                    key.append(MODE_TO_COMMAND[mode])
                    key.append(str(target_temp))
                    key.append("_" + FAN_LEVEL_TO_COMMAND[fan_level])
                    if swing == ThermostatSwing.ON:
                        key.append("_d1")

                    self._lookup_key_in_irset(key)

        command = (
            self._ir_wave_map["".join(key)]["Para"]
            + "|"
            + self._ir_wave_map["".join(key)]["HexCode"]
        )
        return SwitcherBreezeCommand(
            "00000000" + hexlify(str(command).encode()).decode()
        )

    def _resolve_capabilities(self, ir_set: Dict[str, Any]) -> None:
        """Parse the ir_set of the remote and build capability data struct.

        Args:
            ir_set: a dictionary for all supported remotes

        Note:
            This is a private function used by other functions, do not call this
            function directly.

        """
        if ir_set["OnOffType"] == 1:
            self._on_off_type = True

        mode = None

        for wave in ir_set["IRWaveList"]:
            key = wave["Key"]
            try:
                mode = COMMAND_TO_MODE[key[0:2]]
                if mode not in self._modes_features:
                    self._modes_features[mode] = {
                        "swing": False,
                        "fan_levels": set(),
                        "temperature_control": False,
                    }

                    # This type of ACs support swing mode in every mode
                    if self.separated_swing_command:
                        self._modes_features[mode]["swing"] = True

            except KeyError:
                pass

            fan_level = re.match(r".+(f\d)", key)
            if fan_level and mode:
                self._modes_features[mode]["fan_levels"].add(
                    COMMAND_TO_FAN_LEVEL[fan_level.group(1)]
                )

            temp = key[2:4]
            if temp.isdigit():
                if mode and not self._modes_features[mode]["temperature_control"]:
                    self._modes_features[mode]["temperature_control"] = True
                temp = int(temp)
                if temp > self._max_temp:
                    self._max_temp = temp
                if temp < self._min_temp:
                    self._min_temp = temp

            if mode:
                self._modes_features[mode]["swing"] |= "d1" in key

            self._ir_wave_map[key] = {"Para": wave["Para"], "HexCode": wave["HexCode"]}

max_temperature: int property ⚓︎

Getter for Maximum supported temperature.

min_temperature: int property ⚓︎

Getter for Minimum supported temperature.

modes_features: Dict[ThermostatMode, Dict[str, Any]] property ⚓︎

Getter for supported feature per mode.

on_off_type: bool property ⚓︎

Getter for which indicates if the AC if on/off (toggle) type.

remote_id: str property ⚓︎

Getter for remote id.

separated_swing_command: bool property ⚓︎

Getter for which indicates if the AC has a separated swing command.

supported_modes: List[ThermostatMode] property ⚓︎

Getter for supported modes.

__init__(ir_set) ⚓︎

Initialize the remote by parsing the ir_set data.

Source code in src/aioswitcher/api/remotes.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def __init__(self, ir_set: Dict[str, Any]) -> None:
    """Initialize the remote by parsing the ir_set data."""
    self._min_temp = 100  # ridiculously high number
    self._max_temp = -100  # ridiculously low number
    self._on_off_type = False
    self._remote_id: str = ir_set["IRSetID"]
    # _ir_wave_map hosts a shrunk version of the ir_set file which ignores
    # unused data and map key to dict{"HexCode": str, "Para": str}
    # this is being built by the _resolve_capabilities method
    self._ir_wave_map: Dict[str, Dict[str, str]] = {}
    self._modes_features: Dict[ThermostatMode, Dict[str, Any]] = {}
    """
    self._modes_features basically explains the available features
        (Swing/Fan levels/ temp control of each mode)
    Example of _modes_features for ELEC7022 IRSet remote
    {
        < ThermostatMode.AUTO: ('01', 'auto') >: {
            'swing': False,
            'fan_levels': set(),
            'temperature_control': False
        }, < ThermostatMode.DRY: ('02', 'dry') >: {
            'swing': False,
            'fan_levels': set(),
            'temperature_control': False
        }, < ThermostatMode.FAN: ('03', 'fan') >: {
            'swing': False,
            'fan_levels': {
                < ThermostatFanLevel.HIGH: ('3', 'high') > ,
                < ThermostatFanLevel.AUTO: ('0', 'auto') > ,
                < ThermostatFanLevel.MEDIUM: ('2', 'medium') > ,
                < ThermostatFanLevel.LOW: ('1', 'low') >
            },
            'temperature_control': False
        }, < ThermostatMode.COOL: ('04', 'cool') >: {
            'swing': False,
            'fan_levels': {
                < ThermostatFanLevel.HIGH: ('3', 'high') > ,
                < ThermostatFanLevel.AUTO: ('0', 'auto') > ,
                < ThermostatFanLevel.MEDIUM: ('2', 'medium') > ,
                < ThermostatFanLevel.LOW: ('1', 'low') >
            },
            'temperature_control': True
        }, < ThermostatMode.HEAT: ('05', 'heat') >: {
            'swing': True,
            'fan_levels': {
                < ThermostatFanLevel.HIGH: ('3', 'high') > ,
                < ThermostatFanLevel.AUTO: ('0', 'auto') > ,
                < ThermostatFanLevel.MEDIUM: ('2', 'medium') > ,
                < ThermostatFanLevel.LOW: ('1', 'low') >
            },
            'temperature_control': True
        }
    }
    """
    self._separated_swing_command = (
        self._remote_id in SPECIAL_SWING_COMMAND_REMOTE_IDS
    )

    self._resolve_capabilities(ir_set)

build_command(state, mode, target_temp, fan_level, swing, current_state=None) ⚓︎

Build command that controls the Breeze device.

Parameters:

Name Type Description Default
state DeviceState

the desired state of the device

required
mode ThermostatMode

the desired mode of the device

required
target_temp int

the target temperature

required
fan_level ThermostatFanLevel

the desired fan level

required
swing ThermostatSwing

the desired swing state

required
current_state Union[DeviceState, None]

optionally, for toggle device, pass previous state to avoid redundant requests

None

Returns:

Type Description
SwitcherBreezeCommand

An instance of SwitcherBreezeCommand

Source code in src/aioswitcher/api/remotes.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
def build_command(
    self,
    state: DeviceState,
    mode: ThermostatMode,
    target_temp: int,
    fan_level: ThermostatFanLevel,
    swing: ThermostatSwing,
    current_state: Union[DeviceState, None] = None,
) -> SwitcherBreezeCommand:
    """Build command that controls the Breeze device.

    Args:
        state: the desired state of the device
        mode: the desired mode of the device
        target_temp: the target temperature
        fan_level: the desired fan level
        swing: the desired swing state
        current_state: optionally, for toggle device, pass previous state to avoid
            redundant requests

    Returns:
        An instance of ``SwitcherBreezeCommand``

    """
    key: List[str] = []
    command = ""
    # verify the target temp and set maximum if we provided with higher number
    if target_temp > self._max_temp:
        target_temp = self._max_temp

    # verify the target temp and set minimum if we provided with lower number
    elif target_temp < self._min_temp:
        target_temp = self._min_temp

    if mode not in self.supported_modes:
        raise RuntimeError(
            f'Invalid mode "{mode.display}", available modes for this device are: '
            f"{', '.join([x.display for x in self.supported_modes])}"
        )

    # non toggle AC, just turn it off
    if not self._on_off_type and state == DeviceState.OFF:
        key.append("off")
    else:
        # This is a toggle mode AC, we determine here whether the first bit should
        # be on or off in order to change the AC state based on its current state.
        if self._on_off_type and current_state and current_state != state:
            # This is a toggle mode AC.
            key.append("on_")

        # for toggle mode AC - set state. for non toggle AC mode set state and turn
        # it on.
        if self._on_off_type or (not self._on_off_type and state == DeviceState.ON):
            # Auto and Dry can sometimes have a FAN level and in other cases
            # it might not have. in any case we try to add the request fan
            # level to the key, if we get a match we fulfill the request, otherwise
            # we remove the fan and lookup the key again
            if mode in [
                ThermostatMode.AUTO,
                ThermostatMode.DRY,
                ThermostatMode.FAN,
            ]:
                # the command key should start with mode (aa/ad/ar/ah)
                key.append(MODE_TO_COMMAND[mode])
                # add the requested fan level (_f0, _f1, _f2, _f3)
                key.append("_" + FAN_LEVEL_TO_COMMAND[fan_level])

                # add the swing On (_d1) to the key
                if swing == ThermostatSwing.ON:
                    key.append("_d1")

                self._lookup_key_in_irset(key)

            if mode in [ThermostatMode.COOL, ThermostatMode.HEAT]:
                key.append(MODE_TO_COMMAND[mode])
                key.append(str(target_temp))
                key.append("_" + FAN_LEVEL_TO_COMMAND[fan_level])
                if swing == ThermostatSwing.ON:
                    key.append("_d1")

                self._lookup_key_in_irset(key)

    command = (
        self._ir_wave_map["".join(key)]["Para"]
        + "|"
        + self._ir_wave_map["".join(key)]["HexCode"]
    )
    return SwitcherBreezeCommand(
        "00000000" + hexlify(str(command).encode()).decode()
    )

build_swing_command(swing) ⚓︎

Build a special command to control swing on special remotes.

Parameters:

Name Type Description Default
swing ThermostatSwing

the desired swing state

required

Returns:

Type Description
SwitcherBreezeCommand

An instance of SwitcherBreezeCommand

Source code in src/aioswitcher/api/remotes.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def build_swing_command(self, swing: ThermostatSwing) -> SwitcherBreezeCommand:
    """Build a special command to control swing on special remotes.

    Args:
        swing: the desired swing state

    Returns:
        An instance of ``SwitcherBreezeCommand``

    """
    key = "FUN_d0" if swing == ThermostatSwing.OFF else "FUN_d1"
    try:
        command = (
            self._ir_wave_map["".join(key)]["Para"]
            + "|"
            + self._ir_wave_map["".join(key)]["HexCode"]
        )
    except KeyError:
        logger.error(
            f'The special swing key "{key}"        \
                does not exist in the IRSet database!'
        )
        raise RuntimeError(
            f'The special swing key "{key}"'
            " does not exist in the IRSet database!"
        )

    return SwitcherBreezeCommand(
        "00000000" + hexlify(str(command).encode()).decode()
    )

SwitcherBreezeRemoteManager ⚓︎

Class for managing Breeze remotes.

Parameters:

Name Type Description Default
remotes_db_path str

optional path of supported remote json file

BREEZE_REMOTE_DB_FPATH
Source code in src/aioswitcher/api/remotes.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
class SwitcherBreezeRemoteManager:
    """Class for managing Breeze remotes.

    Args:
        remotes_db_path: optional path of supported remote json file

    """

    def __init__(self, remotes_db_path: str = BREEZE_REMOTE_DB_FPATH) -> None:
        """Initialize the Remote manager."""
        self._remotes_db: Dict[str, SwitcherBreezeRemote] = {}
        self._remotes_db_fpath = remotes_db_path

    def get_remote(self, remote_id: str) -> SwitcherBreezeRemote:
        """Get Breeze remote by the remote id.

        Args:
            remote_id: the id of the desired remote

        Returns:
            an instance of ``SwitcherBreezeRemote``

        """
        # check if the remote was already loaded
        if remote_id not in self._remotes_db:
            # load the remote into the memory
            with open(self._remotes_db_fpath) as remotes_fd:
                self._remotes_db[remote_id] = SwitcherBreezeRemote(
                    load(remotes_fd)[remote_id]
                )

        return self._remotes_db[remote_id]

__init__(remotes_db_path=BREEZE_REMOTE_DB_FPATH) ⚓︎

Initialize the Remote manager.

Source code in src/aioswitcher/api/remotes.py
416
417
418
419
def __init__(self, remotes_db_path: str = BREEZE_REMOTE_DB_FPATH) -> None:
    """Initialize the Remote manager."""
    self._remotes_db: Dict[str, SwitcherBreezeRemote] = {}
    self._remotes_db_fpath = remotes_db_path

get_remote(remote_id) ⚓︎

Get Breeze remote by the remote id.

Parameters:

Name Type Description Default
remote_id str

the id of the desired remote

required

Returns:

Type Description
SwitcherBreezeRemote

an instance of SwitcherBreezeRemote

Source code in src/aioswitcher/api/remotes.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def get_remote(self, remote_id: str) -> SwitcherBreezeRemote:
    """Get Breeze remote by the remote id.

    Args:
        remote_id: the id of the desired remote

    Returns:
        an instance of ``SwitcherBreezeRemote``

    """
    # check if the remote was already loaded
    if remote_id not in self._remotes_db:
        # load the remote into the memory
        with open(self._remotes_db_fpath) as remotes_fd:
            self._remotes_db[remote_id] = SwitcherBreezeRemote(
                load(remotes_fd)[remote_id]
            )

    return self._remotes_db[remote_id]

Switcher integration, UDP Bridge module.

DatagramParser dataclass ⚓︎

Utility class for parsing a datagram into various device properties.

Source code in src/aioswitcher/bridge.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
@final
@dataclass(frozen=True)
class DatagramParser:
    """Utility class for parsing a datagram into various device properties."""

    message: bytes

    def is_switcher_originator(self) -> bool:
        """Verify the broadcast message had originated from a switcher device."""
        return hexlify(self.message)[0:4].decode() == "fef0" and (
            len(self.message) == 165
            or len(self.message) == 168  # Switcher Breeze
            or len(self.message) == 159  # Switcher Runner and RunnerMini
        )

    def get_ip_type1(self) -> str:
        """Extract the IP address from the type1 broadcast message (Heater, Plug)."""
        hex_ip = hexlify(self.message)[152:160]
        ip_addr = int(hex_ip[6:8] + hex_ip[4:6] + hex_ip[2:4] + hex_ip[0:2], 16)
        return inet_ntoa(pack("<L", ip_addr))

    def get_ip_type2(self) -> str:
        """Extract the IP address from the broadcast message (Breeze, Runners)."""
        hex_ip = hexlify(self.message)[154:162]
        ip_addr = int(hex_ip[0:2] + hex_ip[2:4] + hex_ip[4:6] + hex_ip[6:8], 16)
        return inet_ntoa(pack(">L", ip_addr))

    def get_mac(self) -> str:
        """Extract the MAC address from the broadcast message."""
        hex_mac = hexlify(self.message)[160:172].decode().upper()
        return (
            hex_mac[0:2]
            + ":"
            + hex_mac[2:4]
            + ":"
            + hex_mac[4:6]
            + ":"
            + hex_mac[6:8]
            + ":"
            + hex_mac[8:10]
            + ":"
            + hex_mac[10:12]
        )

    def get_name(self) -> str:
        """Extract the device name from the broadcast message."""
        return self.message[42:74].decode().rstrip("\x00")

    def get_device_id(self) -> str:
        """Extract the device id from the broadcast message."""
        return hexlify(self.message)[36:42].decode()

    def get_device_key(self) -> str:
        """Extract the device id from the broadcast message."""
        return hexlify(self.message)[80:82].decode()

    def get_device_state(self) -> DeviceState:
        """Extract the device state from the broadcast message."""
        hex_device_state = hexlify(self.message)[266:268].decode()
        return (
            DeviceState.ON
            if hex_device_state == DeviceState.ON.value
            else DeviceState.OFF
        )

    def get_auto_shutdown(self) -> str:
        """Extract the auto shutdown value from the broadcast message."""
        hex_auto_shutdown_val = hexlify(self.message)[310:318]
        int_auto_shutdown_val_secs = int(
            hex_auto_shutdown_val[6:8]
            + hex_auto_shutdown_val[4:6]
            + hex_auto_shutdown_val[2:4]
            + hex_auto_shutdown_val[0:2],
            16,
        )
        return seconds_to_iso_time(int_auto_shutdown_val_secs)

    def get_power_consumption(self) -> int:
        """Extract the power consumption from the broadcast message."""
        hex_power_consumption = hexlify(self.message)[270:278]
        return int(hex_power_consumption[2:4] + hex_power_consumption[0:2], 16)

    def get_remaining(self) -> str:
        """Extract the time remains for the current execution."""
        hex_remaining_time = hexlify(self.message)[294:302]
        int_remaining_time_seconds = int(
            hex_remaining_time[6:8]
            + hex_remaining_time[4:6]
            + hex_remaining_time[2:4]
            + hex_remaining_time[0:2],
            16,
        )
        return seconds_to_iso_time(int_remaining_time_seconds)

    def get_device_type(self) -> DeviceType:
        """Extract the device type from the broadcast message."""
        hex_model = hexlify(self.message[74:76]).decode()
        devices = dict(map(lambda d: (d.hex_rep, d), DeviceType))
        return devices[hex_model]

    # Switcher Runner and Runner Mini methods

    def get_shutter_position(self) -> int:
        """Return the current position of the shutter 0 <= pos <= 100."""
        hex_pos = hexlify(self.message[135:137]).decode()
        return int(hex_pos[2:4]) + int(hex_pos[0:2], 16)

    def get_shutter_direction(self) -> ShutterDirection:
        """Return the current direction of the shutter (UP/DOWN/STOP)."""
        hex_direction = hexlify(self.message[137:139]).decode()
        directions = dict(map(lambda d: (d.value, d), ShutterDirection))
        return directions[hex_direction]

    # Switcher Breeze methods

    def get_thermostat_temp(self) -> float:
        """Return the current temp of the thermostat."""
        hex_temp = hexlify(self.message[135:137]).decode()
        return int(hex_temp[2:4] + hex_temp[0:2], 16) / 10

    def get_thermostat_state(self) -> DeviceState:
        """Return the current thermostat state."""
        hex_power = hexlify(self.message[137:138]).decode()
        return DeviceState.ON if hex_power == DeviceState.ON.value else DeviceState.OFF

    def get_thermostat_mode(self) -> ThermostatMode:
        """Return the current thermostat mode."""
        hex_mode = hexlify(self.message[138:139]).decode()
        states = dict(map(lambda s: (s.value, s), ThermostatMode))
        return ThermostatMode.COOL if hex_mode not in states else states[hex_mode]

    def get_thermostat_target_temp(self) -> int:
        """Return the current temp of the thermostat."""
        hex_temp = hexlify(self.message[139:140]).decode()
        return int(hex_temp, 16)

    def get_thermostat_fan_level(self) -> ThermostatFanLevel:
        """Return the current thermostat fan level."""
        hex_level = hexlify(self.message[140:141]).decode()
        states = dict(map(lambda s: (s.value, s), ThermostatFanLevel))
        return states[hex_level[0:1]]

    def get_thermostat_swing(self) -> ThermostatSwing:
        """Return the current thermostat fan swing."""
        hex_swing = hexlify(self.message[140:141]).decode()

        return (
            ThermostatSwing.OFF
            if hex_swing[1:2] == ThermostatSwing.OFF.value
            else ThermostatSwing.ON
        )

    def get_thermostat_remote_id(self) -> str:
        """Return the current thermostat remote."""
        return self.message[143:151].decode()

get_auto_shutdown() ⚓︎

Extract the auto shutdown value from the broadcast message.

Source code in src/aioswitcher/bridge.py
344
345
346
347
348
349
350
351
352
353
354
def get_auto_shutdown(self) -> str:
    """Extract the auto shutdown value from the broadcast message."""
    hex_auto_shutdown_val = hexlify(self.message)[310:318]
    int_auto_shutdown_val_secs = int(
        hex_auto_shutdown_val[6:8]
        + hex_auto_shutdown_val[4:6]
        + hex_auto_shutdown_val[2:4]
        + hex_auto_shutdown_val[0:2],
        16,
    )
    return seconds_to_iso_time(int_auto_shutdown_val_secs)

get_device_id() ⚓︎

Extract the device id from the broadcast message.

Source code in src/aioswitcher/bridge.py
327
328
329
def get_device_id(self) -> str:
    """Extract the device id from the broadcast message."""
    return hexlify(self.message)[36:42].decode()

get_device_key() ⚓︎

Extract the device id from the broadcast message.

Source code in src/aioswitcher/bridge.py
331
332
333
def get_device_key(self) -> str:
    """Extract the device id from the broadcast message."""
    return hexlify(self.message)[80:82].decode()

get_device_state() ⚓︎

Extract the device state from the broadcast message.

Source code in src/aioswitcher/bridge.py
335
336
337
338
339
340
341
342
def get_device_state(self) -> DeviceState:
    """Extract the device state from the broadcast message."""
    hex_device_state = hexlify(self.message)[266:268].decode()
    return (
        DeviceState.ON
        if hex_device_state == DeviceState.ON.value
        else DeviceState.OFF
    )

get_device_type() ⚓︎

Extract the device type from the broadcast message.

Source code in src/aioswitcher/bridge.py
373
374
375
376
377
def get_device_type(self) -> DeviceType:
    """Extract the device type from the broadcast message."""
    hex_model = hexlify(self.message[74:76]).decode()
    devices = dict(map(lambda d: (d.hex_rep, d), DeviceType))
    return devices[hex_model]

get_ip_type1() ⚓︎

Extract the IP address from the type1 broadcast message (Heater, Plug).

Source code in src/aioswitcher/bridge.py
294
295
296
297
298
def get_ip_type1(self) -> str:
    """Extract the IP address from the type1 broadcast message (Heater, Plug)."""
    hex_ip = hexlify(self.message)[152:160]
    ip_addr = int(hex_ip[6:8] + hex_ip[4:6] + hex_ip[2:4] + hex_ip[0:2], 16)
    return inet_ntoa(pack("<L", ip_addr))

get_ip_type2() ⚓︎

Extract the IP address from the broadcast message (Breeze, Runners).

Source code in src/aioswitcher/bridge.py
300
301
302
303
304
def get_ip_type2(self) -> str:
    """Extract the IP address from the broadcast message (Breeze, Runners)."""
    hex_ip = hexlify(self.message)[154:162]
    ip_addr = int(hex_ip[0:2] + hex_ip[2:4] + hex_ip[4:6] + hex_ip[6:8], 16)
    return inet_ntoa(pack(">L", ip_addr))

get_mac() ⚓︎

Extract the MAC address from the broadcast message.

Source code in src/aioswitcher/bridge.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def get_mac(self) -> str:
    """Extract the MAC address from the broadcast message."""
    hex_mac = hexlify(self.message)[160:172].decode().upper()
    return (
        hex_mac[0:2]
        + ":"
        + hex_mac[2:4]
        + ":"
        + hex_mac[4:6]
        + ":"
        + hex_mac[6:8]
        + ":"
        + hex_mac[8:10]
        + ":"
        + hex_mac[10:12]
    )

get_name() ⚓︎

Extract the device name from the broadcast message.

Source code in src/aioswitcher/bridge.py
323
324
325
def get_name(self) -> str:
    """Extract the device name from the broadcast message."""
    return self.message[42:74].decode().rstrip("\x00")

get_power_consumption() ⚓︎

Extract the power consumption from the broadcast message.

Source code in src/aioswitcher/bridge.py
356
357
358
359
def get_power_consumption(self) -> int:
    """Extract the power consumption from the broadcast message."""
    hex_power_consumption = hexlify(self.message)[270:278]
    return int(hex_power_consumption[2:4] + hex_power_consumption[0:2], 16)

get_remaining() ⚓︎

Extract the time remains for the current execution.

Source code in src/aioswitcher/bridge.py
361
362
363
364
365
366
367
368
369
370
371
def get_remaining(self) -> str:
    """Extract the time remains for the current execution."""
    hex_remaining_time = hexlify(self.message)[294:302]
    int_remaining_time_seconds = int(
        hex_remaining_time[6:8]
        + hex_remaining_time[4:6]
        + hex_remaining_time[2:4]
        + hex_remaining_time[0:2],
        16,
    )
    return seconds_to_iso_time(int_remaining_time_seconds)

get_shutter_direction() ⚓︎

Return the current direction of the shutter (UP/DOWN/STOP).

Source code in src/aioswitcher/bridge.py
386
387
388
389
390
def get_shutter_direction(self) -> ShutterDirection:
    """Return the current direction of the shutter (UP/DOWN/STOP)."""
    hex_direction = hexlify(self.message[137:139]).decode()
    directions = dict(map(lambda d: (d.value, d), ShutterDirection))
    return directions[hex_direction]

get_shutter_position() ⚓︎

Return the current position of the shutter 0 <= pos <= 100.

Source code in src/aioswitcher/bridge.py
381
382
383
384
def get_shutter_position(self) -> int:
    """Return the current position of the shutter 0 <= pos <= 100."""
    hex_pos = hexlify(self.message[135:137]).decode()
    return int(hex_pos[2:4]) + int(hex_pos[0:2], 16)

get_thermostat_fan_level() ⚓︎

Return the current thermostat fan level.

Source code in src/aioswitcher/bridge.py
415
416
417
418
419
def get_thermostat_fan_level(self) -> ThermostatFanLevel:
    """Return the current thermostat fan level."""
    hex_level = hexlify(self.message[140:141]).decode()
    states = dict(map(lambda s: (s.value, s), ThermostatFanLevel))
    return states[hex_level[0:1]]

get_thermostat_mode() ⚓︎

Return the current thermostat mode.

Source code in src/aioswitcher/bridge.py
404
405
406
407
408
def get_thermostat_mode(self) -> ThermostatMode:
    """Return the current thermostat mode."""
    hex_mode = hexlify(self.message[138:139]).decode()
    states = dict(map(lambda s: (s.value, s), ThermostatMode))
    return ThermostatMode.COOL if hex_mode not in states else states[hex_mode]

get_thermostat_remote_id() ⚓︎

Return the current thermostat remote.

Source code in src/aioswitcher/bridge.py
431
432
433
def get_thermostat_remote_id(self) -> str:
    """Return the current thermostat remote."""
    return self.message[143:151].decode()

get_thermostat_state() ⚓︎

Return the current thermostat state.

Source code in src/aioswitcher/bridge.py
399
400
401
402
def get_thermostat_state(self) -> DeviceState:
    """Return the current thermostat state."""
    hex_power = hexlify(self.message[137:138]).decode()
    return DeviceState.ON if hex_power == DeviceState.ON.value else DeviceState.OFF

get_thermostat_swing() ⚓︎

Return the current thermostat fan swing.

Source code in src/aioswitcher/bridge.py
421
422
423
424
425
426
427
428
429
def get_thermostat_swing(self) -> ThermostatSwing:
    """Return the current thermostat fan swing."""
    hex_swing = hexlify(self.message[140:141]).decode()

    return (
        ThermostatSwing.OFF
        if hex_swing[1:2] == ThermostatSwing.OFF.value
        else ThermostatSwing.ON
    )

get_thermostat_target_temp() ⚓︎

Return the current temp of the thermostat.

Source code in src/aioswitcher/bridge.py
410
411
412
413
def get_thermostat_target_temp(self) -> int:
    """Return the current temp of the thermostat."""
    hex_temp = hexlify(self.message[139:140]).decode()
    return int(hex_temp, 16)

get_thermostat_temp() ⚓︎

Return the current temp of the thermostat.

Source code in src/aioswitcher/bridge.py
394
395
396
397
def get_thermostat_temp(self) -> float:
    """Return the current temp of the thermostat."""
    hex_temp = hexlify(self.message[135:137]).decode()
    return int(hex_temp[2:4] + hex_temp[0:2], 16) / 10

is_switcher_originator() ⚓︎

Verify the broadcast message had originated from a switcher device.

Source code in src/aioswitcher/bridge.py
286
287
288
289
290
291
292
def is_switcher_originator(self) -> bool:
    """Verify the broadcast message had originated from a switcher device."""
    return hexlify(self.message)[0:4].decode() == "fef0" and (
        len(self.message) == 165
        or len(self.message) == 168  # Switcher Breeze
        or len(self.message) == 159  # Switcher Runner and RunnerMini
    )

SwitcherBridge ⚓︎

Use for running a UDP client for bridging Switcher devices broadcast messages.

Parameters:

Name Type Description Default
on_device Callable[[SwitcherBase], Any]

a callable to which every new SwitcherBase device found will be send.

required
broadcast_ports List[int]

broadcast ports list, default for type 1 devices is 20002, default for type 2 devices is 20003. On newer type1 devices, the port is 10002. On newer type2 devices, the port is 10003.

[SWITCHER_UDP_PORT_TYPE1, SWITCHER_UDP_PORT_TYPE1_NEW_VERSION, SWITCHER_UDP_PORT_TYPE2, SWITCHER_UDP_PORT_TYPE2_NEW_VERSION]
Source code in src/aioswitcher/bridge.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
@final
class SwitcherBridge:
    """Use for running a UDP client for bridging Switcher devices broadcast messages.

    Args:
        on_device: a callable to which every new SwitcherBase device found will be send.
        broadcast_ports: broadcast ports list, default for type 1 devices is 20002,
            default for type 2 devices is 20003.
            On newer type1 devices, the port is 10002.
            On newer type2 devices, the port is 10003.

    """

    def __init__(
        self,
        on_device: Callable[[SwitcherBase], Any],
        broadcast_ports: List[int] = [
            SWITCHER_UDP_PORT_TYPE1,
            SWITCHER_UDP_PORT_TYPE1_NEW_VERSION,
            SWITCHER_UDP_PORT_TYPE2,
            SWITCHER_UDP_PORT_TYPE2_NEW_VERSION,
        ],
    ) -> None:
        """Initialize the switcher bridge."""
        self._on_device = on_device
        self._broadcast_ports = broadcast_ports
        self._is_running = False
        self._transports: Dict[int, Optional[BaseTransport]] = {}

    async def __aenter__(self) -> "SwitcherBridge":
        """Enter SwitcherBridge asynchronous context manager."""
        await self.start()
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        """Exit the SwitcherBridge asynchronous context manager."""
        await self.stop()

    async def start(self) -> None:
        """Create an asynchronous listener and start the bridge."""
        for broadcast_port in self._broadcast_ports:
            logger.info("starting the udp bridge on port %s", broadcast_port)
            protocol_factory = UdpClientProtocol(
                partial(_parse_device_from_datagram, self._on_device)
            )
            transport, protocol = await get_running_loop().create_datagram_endpoint(
                lambda: protocol_factory,
                local_addr=("0.0.0.0", broadcast_port),  # nosec
                family=AF_INET,
            )
            self._transports[broadcast_port] = transport
            logger.debug("udp bridge on port %s started", broadcast_port)

        self._is_running = True

    async def stop(self) -> None:
        """Stop the asynchronous bridge."""
        for broadcast_port in self._broadcast_ports:
            transport = self._transports.get(broadcast_port)

            if transport and not transport.is_closing():
                logger.info("stopping the udp bridge on port %s", broadcast_port)
                transport.close()
            else:
                logger.info("udp bridge on port %s not started", broadcast_port)

        self._is_running = False

    @property
    def is_running(self) -> bool:
        """bool: Return true if bridge is running."""
        return self._is_running

is_running: bool property ⚓︎

bool: Return true if bridge is running.

__aenter__() async ⚓︎

Enter SwitcherBridge asynchronous context manager.

Source code in src/aioswitcher/bridge.py
197
198
199
200
async def __aenter__(self) -> "SwitcherBridge":
    """Enter SwitcherBridge asynchronous context manager."""
    await self.start()
    return self

__aexit__(exc_type, exc_value, traceback) async ⚓︎

Exit the SwitcherBridge asynchronous context manager.

Source code in src/aioswitcher/bridge.py
202
203
204
205
206
207
208
209
async def __aexit__(
    self,
    exc_type: Optional[Type[BaseException]],
    exc_value: Optional[BaseException],
    traceback: Optional[TracebackType],
) -> None:
    """Exit the SwitcherBridge asynchronous context manager."""
    await self.stop()

__init__(on_device, broadcast_ports=[SWITCHER_UDP_PORT_TYPE1, SWITCHER_UDP_PORT_TYPE1_NEW_VERSION, SWITCHER_UDP_PORT_TYPE2, SWITCHER_UDP_PORT_TYPE2_NEW_VERSION]) ⚓︎

Initialize the switcher bridge.

Source code in src/aioswitcher/bridge.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def __init__(
    self,
    on_device: Callable[[SwitcherBase], Any],
    broadcast_ports: List[int] = [
        SWITCHER_UDP_PORT_TYPE1,
        SWITCHER_UDP_PORT_TYPE1_NEW_VERSION,
        SWITCHER_UDP_PORT_TYPE2,
        SWITCHER_UDP_PORT_TYPE2_NEW_VERSION,
    ],
) -> None:
    """Initialize the switcher bridge."""
    self._on_device = on_device
    self._broadcast_ports = broadcast_ports
    self._is_running = False
    self._transports: Dict[int, Optional[BaseTransport]] = {}

start() async ⚓︎

Create an asynchronous listener and start the bridge.

Source code in src/aioswitcher/bridge.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
async def start(self) -> None:
    """Create an asynchronous listener and start the bridge."""
    for broadcast_port in self._broadcast_ports:
        logger.info("starting the udp bridge on port %s", broadcast_port)
        protocol_factory = UdpClientProtocol(
            partial(_parse_device_from_datagram, self._on_device)
        )
        transport, protocol = await get_running_loop().create_datagram_endpoint(
            lambda: protocol_factory,
            local_addr=("0.0.0.0", broadcast_port),  # nosec
            family=AF_INET,
        )
        self._transports[broadcast_port] = transport
        logger.debug("udp bridge on port %s started", broadcast_port)

    self._is_running = True

stop() async ⚓︎

Stop the asynchronous bridge.

Source code in src/aioswitcher/bridge.py
228
229
230
231
232
233
234
235
236
237
238
239
async def stop(self) -> None:
    """Stop the asynchronous bridge."""
    for broadcast_port in self._broadcast_ports:
        transport = self._transports.get(broadcast_port)

        if transport and not transport.is_closing():
            logger.info("stopping the udp bridge on port %s", broadcast_port)
            transport.close()
        else:
            logger.info("udp bridge on port %s not started", broadcast_port)

    self._is_running = False

UdpClientProtocol ⚓︎

Bases: DatagramProtocol

Implementation of the Asyncio UDP DatagramProtocol.

Source code in src/aioswitcher/bridge.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
@final
class UdpClientProtocol(DatagramProtocol):
    """Implementation of the Asyncio UDP DatagramProtocol."""

    def __init__(self, on_datagram: Callable[[bytes], None]) -> None:
        """Initialize the protocol."""
        self.transport: Optional[BaseTransport] = None
        self._on_datagram = on_datagram

    def connection_made(self, transport: BaseTransport) -> None:
        """Call on connection established."""
        self.transport = transport

    def datagram_received(self, data: bytes, addr: Tuple[Any, Any]) -> None:
        """Call on datagram received."""
        self._on_datagram(data)

    def error_received(self, exc: Optional[Exception]) -> None:
        """Call on exception received."""
        if exc:
            logger.error(f"udp client received error {exc}")
        else:
            warn("udp client received error")

    def connection_lost(self, exc: Optional[Exception]) -> None:
        """Call on connection lost."""
        if exc:
            logger.critical(f"udp bridge lost its connection {exc}")
        else:
            logger.info("udp connection stopped")

__init__(on_datagram) ⚓︎

Initialize the protocol.

Source code in src/aioswitcher/bridge.py
251
252
253
254
def __init__(self, on_datagram: Callable[[bytes], None]) -> None:
    """Initialize the protocol."""
    self.transport: Optional[BaseTransport] = None
    self._on_datagram = on_datagram

connection_lost(exc) ⚓︎

Call on connection lost.

Source code in src/aioswitcher/bridge.py
271
272
273
274
275
276
def connection_lost(self, exc: Optional[Exception]) -> None:
    """Call on connection lost."""
    if exc:
        logger.critical(f"udp bridge lost its connection {exc}")
    else:
        logger.info("udp connection stopped")

connection_made(transport) ⚓︎

Call on connection established.

Source code in src/aioswitcher/bridge.py
256
257
258
def connection_made(self, transport: BaseTransport) -> None:
    """Call on connection established."""
    self.transport = transport

datagram_received(data, addr) ⚓︎

Call on datagram received.

Source code in src/aioswitcher/bridge.py
260
261
262
def datagram_received(self, data: bytes, addr: Tuple[Any, Any]) -> None:
    """Call on datagram received."""
    self._on_datagram(data)

error_received(exc) ⚓︎

Call on exception received.

Source code in src/aioswitcher/bridge.py
264
265
266
267
268
269
def error_received(self, exc: Optional[Exception]) -> None:
    """Call on exception received."""
    if exc:
        logger.error(f"udp client received error {exc}")
    else:
        warn("udp client received error")

Switcher integration device module.

DeviceCategory ⚓︎

Bases: Enum

Enum for relaying the device category.

Source code in src/aioswitcher/device/__init__.py
24
25
26
27
28
29
30
31
@unique
class DeviceCategory(Enum):
    """Enum for relaying the device category."""

    WATER_HEATER = auto()
    POWER_PLUG = auto()
    THERMOSTAT = auto()
    SHUTTER = auto()

DeviceState ⚓︎

Bases: Enum

Enum class representing the device's state.

Source code in src/aioswitcher/device/__init__.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@unique
class DeviceState(Enum):
    """Enum class representing the device's state."""

    ON = "01", "on"
    OFF = "00", "off"

    def __new__(cls, value: str, display: str) -> "DeviceState":
        """Override the default enum constructor and include extra properties."""
        new_enum = object.__new__(cls)
        new_enum._value = value  # type: ignore
        new_enum._display = display  # type: ignore
        return new_enum

    @property
    def display(self) -> str:
        """Return the display name of the state."""
        return self._display  # type: ignore

    @property
    def value(self) -> str:
        """Return the value of the state."""
        return self._value  # type: ignore

display: str property ⚓︎

Return the display name of the state.

value: str property ⚓︎

Return the value of the state.

__new__(value, display) ⚓︎

Override the default enum constructor and include extra properties.

Source code in src/aioswitcher/device/__init__.py
87
88
89
90
91
92
def __new__(cls, value: str, display: str) -> "DeviceState":
    """Override the default enum constructor and include extra properties."""
    new_enum = object.__new__(cls)
    new_enum._value = value  # type: ignore
    new_enum._display = display  # type: ignore
    return new_enum

DeviceType ⚓︎

Bases: Enum

Enum for relaying the type of the switcher devices.

Source code in src/aioswitcher/device/__init__.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@unique
class DeviceType(Enum):
    """Enum for relaying the type of the switcher devices."""

    MINI = "Switcher Mini", "030f", 1, DeviceCategory.WATER_HEATER
    POWER_PLUG = "Switcher Power Plug", "01a8", 1, DeviceCategory.POWER_PLUG
    TOUCH = "Switcher Touch", "030b", 1, DeviceCategory.WATER_HEATER
    V2_ESP = "Switcher V2 (esp)", "01a7", 1, DeviceCategory.WATER_HEATER
    V2_QCA = "Switcher V2 (qualcomm)", "01a1", 1, DeviceCategory.WATER_HEATER
    V4 = "Switcher V4", "0317", 1, DeviceCategory.WATER_HEATER
    BREEZE = "Switcher Breeze", "0e01", 2, DeviceCategory.THERMOSTAT
    RUNNER = "Switcher Runner", "0c01", 2, DeviceCategory.SHUTTER
    RUNNER_MINI = "Switcher Runner Mini", "0c02", 2, DeviceCategory.SHUTTER

    def __new__(
        cls, value: str, hex_rep: str, protocol_type: int, category: DeviceCategory
    ) -> "DeviceType":
        """Override the default enum constructor and include extra properties."""
        new_enum = object.__new__(cls)
        new_enum._value = value  # type: ignore
        new_enum._hex_rep = hex_rep  # type: ignore
        new_enum._protocol_type = protocol_type  # type: ignore
        new_enum._category = category  # type: ignore
        return new_enum

    @property
    def value(self) -> str:
        """Return the value of the state."""
        return self._value  # type: ignore

    @property
    def hex_rep(self) -> str:
        """Return the hexadecimal representation of the device type."""
        return self._hex_rep  # type: ignore

    @property
    def protocol_type(self) -> int:
        """Return the protocol type of the device."""
        return self._protocol_type  # type: ignore

    @property
    def category(self) -> DeviceCategory:
        """Return the category of the device type."""
        return self._category  # type: ignore

category: DeviceCategory property ⚓︎

Return the category of the device type.

hex_rep: str property ⚓︎

Return the hexadecimal representation of the device type.

protocol_type: int property ⚓︎

Return the protocol type of the device.

value: str property ⚓︎

Return the value of the state.

__new__(value, hex_rep, protocol_type, category) ⚓︎

Override the default enum constructor and include extra properties.

Source code in src/aioswitcher/device/__init__.py
48
49
50
51
52
53
54
55
56
57
def __new__(
    cls, value: str, hex_rep: str, protocol_type: int, category: DeviceCategory
) -> "DeviceType":
    """Override the default enum constructor and include extra properties."""
    new_enum = object.__new__(cls)
    new_enum._value = value  # type: ignore
    new_enum._hex_rep = hex_rep  # type: ignore
    new_enum._protocol_type = protocol_type  # type: ignore
    new_enum._category = category  # type: ignore
    return new_enum

ShutterDirection ⚓︎

Bases: Enum

Enum class representing the shutter device's position.

Source code in src/aioswitcher/device/__init__.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
@final
class ShutterDirection(Enum):
    """Enum class representing the shutter device's position."""

    SHUTTER_STOP = "0000", "stop"
    SHUTTER_UP = "0100", "up"
    SHUTTER_DOWN = "0001", "down"

    def __new__(cls, value: str, display: str) -> "ShutterDirection":
        """Override the default enum constructor and include extra properties."""
        new_enum = object.__new__(cls)
        new_enum._value = value  # type: ignore
        new_enum._display = display  # type: ignore
        return new_enum

    @property
    def display(self) -> str:
        """Return the display name of the direction."""
        return self._display  # type: ignore

    @property
    def value(self) -> str:
        """Return the value of the direction."""
        return self._value  # type: ignore

display: str property ⚓︎

Return the display name of the direction.

value: str property ⚓︎

Return the value of the direction.

__new__(value, display) ⚓︎

Override the default enum constructor and include extra properties.

Source code in src/aioswitcher/device/__init__.py
190
191
192
193
194
195
def __new__(cls, value: str, display: str) -> "ShutterDirection":
    """Override the default enum constructor and include extra properties."""
    new_enum = object.__new__(cls)
    new_enum._value = value  # type: ignore
    new_enum._display = display  # type: ignore
    return new_enum

SwitcherBase dataclass ⚓︎

Bases: ABC

Abstraction for all switcher devices.

Parameters:

Name Type Description Default
device_type DeviceType

the DeviceType appropriate member.

required
device_state DeviceState

the DeviceState appropriate member.

required
device_id str

the id retrieved from the device.

required
device_key str

the login key of the device.

required
ip_address str

the ip address assigned to the device.

required
mac_address str

the mac address assigned to the device.

required
name str

the name of the device.

required
Source code in src/aioswitcher/device/__init__.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
@dataclass
class SwitcherBase(ABC):
    """Abstraction for all switcher devices.

    Args:
        device_type: the DeviceType appropriate member.
        device_state: the DeviceState appropriate member.
        device_id: the id retrieved from the device.
        device_key: the login key of the device.
        ip_address: the ip address assigned to the device.
        mac_address: the mac address assigned to the device.
        name: the name of the device.

    """

    device_type: DeviceType
    device_state: DeviceState
    device_id: str
    device_key: str
    ip_address: str
    mac_address: str
    name: str
    last_data_update: datetime = field(init=False)

    def __post_init__(self) -> None:
        """Post initialization, set last_data_update to the instantiation datetime."""
        self.last_data_update = datetime.now()

__post_init__() ⚓︎

Post initialization, set last_data_update to the instantiation datetime.

Source code in src/aioswitcher/device/__init__.py
232
233
234
def __post_init__(self) -> None:
    """Post initialization, set last_data_update to the instantiation datetime."""
    self.last_data_update = datetime.now()

SwitcherPowerBase dataclass ⚓︎

Bases: ABC

Abstraction for all switcher devices reporting power data.

Parameters:

Name Type Description Default
power_consumption int

the current power consumption in watts.

required
electric_current float

the current power consumption in amps.

required
Source code in src/aioswitcher/device/__init__.py
237
238
239
240
241
242
243
244
245
246
247
248
@dataclass
class SwitcherPowerBase(ABC):
    """Abstraction for all switcher devices reporting power data.

    Args:
        power_consumption: the current power consumption in watts.
        electric_current: the current power consumption in amps.

    """

    power_consumption: int
    electric_current: float

SwitcherPowerPlug dataclass ⚓︎

Bases: SwitcherPowerBase, SwitcherBase

Implementation of the Switcher Power Plug device.

Please Note the order of the inherited classes to understand the order of the instantiation parameters and the super call.

Source code in src/aioswitcher/device/__init__.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
@final
@dataclass
class SwitcherPowerPlug(SwitcherPowerBase, SwitcherBase):
    """Implementation of the Switcher Power Plug device.

    Please Note the order of the inherited classes to understand the order of the
    instantiation parameters and the super call.
    """

    def __post_init__(self) -> None:
        """Post initialization validate device type category as POWER_PLUG."""
        if self.device_type.category != DeviceCategory.POWER_PLUG:
            raise ValueError("only power plugs are allowed")
        super().__post_init__()

__post_init__() ⚓︎

Post initialization validate device type category as POWER_PLUG.

Source code in src/aioswitcher/device/__init__.py
313
314
315
316
317
def __post_init__(self) -> None:
    """Post initialization validate device type category as POWER_PLUG."""
    if self.device_type.category != DeviceCategory.POWER_PLUG:
        raise ValueError("only power plugs are allowed")
    super().__post_init__()

SwitcherShutter dataclass ⚓︎

Bases: SwitcherShutterBase, SwitcherBase

Implementation of the Switcher Shutter device.

Source code in src/aioswitcher/device/__init__.py
349
350
351
352
353
354
355
356
357
358
@final
@dataclass
class SwitcherShutter(SwitcherShutterBase, SwitcherBase):
    """Implementation of the Switcher Shutter device."""

    def __post_init__(self) -> None:
        """Post initialization validate device type category as SHUTTER."""
        if self.device_type.category != DeviceCategory.SHUTTER:
            raise ValueError("only shutters are allowed")
        return super().__post_init__()

__post_init__() ⚓︎

Post initialization validate device type category as SHUTTER.

Source code in src/aioswitcher/device/__init__.py
354
355
356
357
358
def __post_init__(self) -> None:
    """Post initialization validate device type category as SHUTTER."""
    if self.device_type.category != DeviceCategory.SHUTTER:
        raise ValueError("only shutters are allowed")
    return super().__post_init__()

SwitcherShutterBase dataclass ⚓︎

Bases: ABC

Abstraction for all switcher devices controlling shutter.

Parameters:

Name Type Description Default
position int

the current position of the shutter (integer percentage).

required
direction ShutterDirection

the current direction of the shutter.

required
Source code in src/aioswitcher/device/__init__.py
291
292
293
294
295
296
297
298
299
300
301
@dataclass
class SwitcherShutterBase(ABC):
    """Abstraction for all switcher devices controlling shutter.

    Args:
        position: the current position of the shutter (integer percentage).
        direction: the current direction of the shutter.
    """

    position: int
    direction: ShutterDirection

SwitcherThermostat dataclass ⚓︎

Bases: SwitcherThermostatBase, SwitcherBase

Implementation of the Switcher Thermostat device.

Source code in src/aioswitcher/device/__init__.py
336
337
338
339
340
341
342
343
344
345
346
@final
@dataclass
class SwitcherThermostat(SwitcherThermostatBase, SwitcherBase):
    """Implementation of the Switcher Thermostat device."""

    def __post_init__(self) -> None:
        """Post initialization validate device type category as THERMOSTAT."""
        if self.device_type.category != DeviceCategory.THERMOSTAT:
            raise ValueError("only thermostats are allowed")
        self.remote = None
        return super().__post_init__()

__post_init__() ⚓︎

Post initialization validate device type category as THERMOSTAT.

Source code in src/aioswitcher/device/__init__.py
341
342
343
344
345
346
def __post_init__(self) -> None:
    """Post initialization validate device type category as THERMOSTAT."""
    if self.device_type.category != DeviceCategory.THERMOSTAT:
        raise ValueError("only thermostats are allowed")
    self.remote = None
    return super().__post_init__()

SwitcherThermostatBase dataclass ⚓︎

Bases: ABC

Abstraction for switcher thermostat devices.

Parameters:

Name Type Description Default
mode ThermostatMode

the mode of the thermostat.

required
temperature float

the current temperature in celsius.

required
target_temperature int

the current target temperature in celsius.

required
fan_level ThermostatFanLevel

the current fan level in celsius.

required
swing ThermostatSwing

the current swing state.

required
remote_id str

the id of the remote used to control this thermostat

required
Source code in src/aioswitcher/device/__init__.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
@dataclass
class SwitcherThermostatBase(ABC):
    """Abstraction for switcher thermostat devices.

    Args:
        mode: the mode of the thermostat.
        temperature: the current temperature in celsius.
        target_temperature: the current target temperature in celsius.
        fan_level: the current fan level in celsius.
        swing: the current swing state.
        remote_id: the id of the remote used to control this thermostat
    """

    mode: ThermostatMode
    temperature: float
    target_temperature: int
    fan_level: ThermostatFanLevel
    swing: ThermostatSwing
    remote_id: str

SwitcherTimedBase dataclass ⚓︎

Bases: ABC

Abstraction for all switcher devices supporting timed operations.

Parameters:

Name Type Description Default
remaining_time str

remaining time to current run.

required
auto_shutdown str

configured value for auto shutdown.

required
Source code in src/aioswitcher/device/__init__.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
@dataclass
class SwitcherTimedBase(ABC):
    """Abstraction for all switcher devices supporting timed operations.

    Args:
        remaining_time: remaining time to current run.
        auto_shutdown: configured value for auto shutdown.

    """

    remaining_time: str
    auto_shutdown: str

    @property
    def auto_off_set(self) -> str:
        """Fix for backward compatibility issues with home assistant."""
        return self.auto_shutdown

auto_off_set: str property ⚓︎

Fix for backward compatibility issues with home assistant.

SwitcherWaterHeater dataclass ⚓︎

Bases: SwitcherTimedBase, SwitcherPowerBase, SwitcherBase

Implementation of the Switcher Water Heater device.

Please Note the order of the inherited classes to understand the order of the instantiation parameters and the super call.

Source code in src/aioswitcher/device/__init__.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
@final
@dataclass
class SwitcherWaterHeater(SwitcherTimedBase, SwitcherPowerBase, SwitcherBase):
    """Implementation of the Switcher Water Heater device.

    Please Note the order of the inherited classes to understand the order of the
    instantiation parameters and the super call.
    """

    def __post_init__(self) -> None:
        """Post initialization validate device type category as WATER_HEATER."""
        if self.device_type.category != DeviceCategory.WATER_HEATER:
            raise ValueError("only water heaters are allowed")
        super().__post_init__()

__post_init__() ⚓︎

Post initialization validate device type category as WATER_HEATER.

Source code in src/aioswitcher/device/__init__.py
329
330
331
332
333
def __post_init__(self) -> None:
    """Post initialization validate device type category as WATER_HEATER."""
    if self.device_type.category != DeviceCategory.WATER_HEATER:
        raise ValueError("only water heaters are allowed")
    super().__post_init__()

ThermostatFanLevel ⚓︎

Bases: Enum

Enum class representing the thermostat device's fan level.

Source code in src/aioswitcher/device/__init__.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
class ThermostatFanLevel(Enum):
    """Enum class representing the thermostat device's fan level."""

    LOW = "1", "low"
    MEDIUM = "2", "medium"
    HIGH = "3", "high"
    AUTO = "0", "auto"

    def __new__(cls, value: str, display: str) -> "ThermostatFanLevel":
        """Override the default enum constructor and include extra properties."""
        new_enum = object.__new__(cls)
        new_enum._value = value  # type: ignore
        new_enum._display = display  # type: ignore
        return new_enum

    @property
    def display(self) -> str:
        """Return the display name of the fan level."""
        return self._display  # type: ignore

    @property
    def value(self) -> str:
        """Return the value of the fan level."""
        return self._value  # type: ignore

display: str property ⚓︎

Return the display name of the fan level.

value: str property ⚓︎

Return the value of the fan level.

__new__(value, display) ⚓︎

Override the default enum constructor and include extra properties.

Source code in src/aioswitcher/device/__init__.py
140
141
142
143
144
145
def __new__(cls, value: str, display: str) -> "ThermostatFanLevel":
    """Override the default enum constructor and include extra properties."""
    new_enum = object.__new__(cls)
    new_enum._value = value  # type: ignore
    new_enum._display = display  # type: ignore
    return new_enum

ThermostatMode ⚓︎

Bases: Enum

Enum class representing the thermostat device's position.

Source code in src/aioswitcher/device/__init__.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class ThermostatMode(Enum):
    """Enum class representing the thermostat device's position."""

    AUTO = "01", "auto"
    DRY = "02", "dry"
    FAN = "03", "fan"
    COOL = "04", "cool"
    HEAT = "05", "heat"

    def __new__(cls, value: str, display: str) -> "ThermostatMode":
        """Override the default enum constructor and include extra properties."""
        new_enum = object.__new__(cls)
        new_enum._value = value  # type: ignore
        new_enum._display = display  # type: ignore
        return new_enum

    @property
    def display(self) -> str:
        """Return the display name of the mode."""
        return self._display  # type: ignore

    @property
    def value(self) -> str:
        """Return the value of the mode."""
        return self._value  # type: ignore

display: str property ⚓︎

Return the display name of the mode.

value: str property ⚓︎

Return the value of the mode.

__new__(value, display) ⚓︎

Override the default enum constructor and include extra properties.

Source code in src/aioswitcher/device/__init__.py
114
115
116
117
118
119
def __new__(cls, value: str, display: str) -> "ThermostatMode":
    """Override the default enum constructor and include extra properties."""
    new_enum = object.__new__(cls)
    new_enum._value = value  # type: ignore
    new_enum._display = display  # type: ignore
    return new_enum

ThermostatSwing ⚓︎

Bases: Enum

Enum class representing the thermostat device's swing state.

Source code in src/aioswitcher/device/__init__.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class ThermostatSwing(Enum):
    """Enum class representing the thermostat device's swing state."""

    OFF = "0", "off"
    ON = "1", "on"

    def __new__(cls, value: str, display: str) -> "ThermostatSwing":
        """Override the default enum constructor and include extra properties."""
        new_enum = object.__new__(cls)
        new_enum._value = value  # type: ignore
        new_enum._display = display  # type: ignore
        return new_enum

    @property
    def display(self) -> str:
        """Return the display name of the swing."""
        return self._display  # type: ignore

    @property
    def value(self) -> str:
        """Return the value of the swing."""
        return self._value  # type: ignore

display: str property ⚓︎

Return the display name of the swing.

value: str property ⚓︎

Return the value of the swing.

__new__(value, display) ⚓︎

Override the default enum constructor and include extra properties.

Source code in src/aioswitcher/device/__init__.py
164
165
166
167
168
169
def __new__(cls, value: str, display: str) -> "ThermostatSwing":
    """Override the default enum constructor and include extra properties."""
    new_enum = object.__new__(cls)
    new_enum._value = value  # type: ignore
    new_enum._display = display  # type: ignore
    return new_enum

Switcher integration device module tools.

current_timestamp_to_hexadecimal() ⚓︎

Generate hexadecimal representation of the current timestamp.

Return

Hexadecimal representation of the current unix time retrieved by time.time.

Source code in src/aioswitcher/device/tools.py
114
115
116
117
118
119
120
121
122
123
124
def current_timestamp_to_hexadecimal() -> str:
    """Generate hexadecimal representation of the current timestamp.

    Return:
        Hexadecimal representation of the current unix time retrieved by ``time.time``.

    """
    round_timestamp = int(round(time.time()))
    binary_timestamp = pack("<I", round_timestamp)
    hex_timestamp = hexlify(binary_timestamp)
    return hex_timestamp.decode()

minutes_to_hexadecimal_seconds(minutes) ⚓︎

Encode minutes to an hexadecimal packed as little endian unsigned int.

Parameters:

Name Type Description Default
minutes int

minutes to encode.

required
Return

Hexadecimal representation of the minutes argument.

Source code in src/aioswitcher/device/tools.py
63
64
65
66
67
68
69
70
71
72
73
def minutes_to_hexadecimal_seconds(minutes: int) -> str:
    """Encode minutes to an hexadecimal packed as little endian unsigned int.

    Args:
        minutes: minutes to encode.

    Return:
        Hexadecimal representation of the minutes argument.

    """
    return hexlify(pack("<I", minutes * 60)).decode()

seconds_to_iso_time(all_seconds) ⚓︎

Convert seconds to iso time.

Parameters:

Name Type Description Default
all_seconds int

the total number of seconds to convert.

required
Return

A string representing the converted iso time in %H:%M:%S format. e.g. "02:24:37".

Source code in src/aioswitcher/device/tools.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def seconds_to_iso_time(all_seconds: int) -> str:
    """Convert seconds to iso time.

    Args:
        all_seconds: the total number of seconds to convert.

    Return:
        A string representing the converted iso time in %H:%M:%S format.
        e.g. "02:24:37".

    """
    minutes, seconds = divmod(int(all_seconds), 60)
    hours, minutes = divmod(minutes, 60)

    return datetime.time(hour=hours, minute=minutes, second=seconds).isoformat()

set_message_length(message) ⚓︎

Set the message length.

Source code in src/aioswitcher/device/tools.py
132
133
134
135
def set_message_length(message: str) -> str:
    """Set the message length."""
    length = "{:x}".format(len(unhexlify(message + "00000000"))).ljust(4, "0")
    return "fef0" + str(length) + message[8:]

sign_packet_with_crc_key(hex_packet) ⚓︎

Sign the packets with the designated crc key.

Parameters:

Name Type Description Default
hex_packet str

packet to sign.

required
Return

The calculated and signed packet.

Source code in src/aioswitcher/device/tools.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def sign_packet_with_crc_key(hex_packet: str) -> str:
    """Sign the packets with the designated crc key.

    Args:
        hex_packet: packet to sign.

    Return:
        The calculated and signed packet.

    """
    binary_packet = unhexlify(hex_packet)
    binary_packet_crc = pack(">I", crc_hqx(binary_packet, 0x1021))
    hex_packet_crc = hexlify(binary_packet_crc).decode()
    hex_packet_crc_sliced = hex_packet_crc[6:8] + hex_packet_crc[4:6]

    binary_key = unhexlify(hex_packet_crc_sliced + "30" * 32)
    binary_key_crc = pack(">I", crc_hqx(binary_key, 0x1021))
    hex_key_crc = hexlify(binary_key_crc).decode()
    hex_key_crc_sliced = hex_key_crc[6:8] + hex_key_crc[4:6]

    return hex_packet + hex_packet_crc_sliced + hex_key_crc_sliced

string_to_hexadecimale_device_name(name) ⚓︎

Encode string device name to an appropriate hexadecimal value.

Parameters:

Name Type Description Default
name str

the desired name for encoding.

required
Return

Hexadecimal representation of the name argument.

Source code in src/aioswitcher/device/tools.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def string_to_hexadecimale_device_name(name: str) -> str:
    """Encode string device name to an appropriate hexadecimal value.

    Args:
        name: the desired name for encoding.

    Return:
        Hexadecimal representation of the name argument.

    """
    length = len(name)
    if 1 < length < 33:
        hex_name = hexlify(name.encode())
        zeros_pad = ("00" * (32 - length)).encode()
        return (hex_name + zeros_pad).decode()
    raise ValueError("name length can vary from 2 to 32")

timedelta_to_hexadecimal_seconds(full_time) ⚓︎

Encode timedelta as seconds to an hexadecimal packed as little endian unsigned.

Parameters:

Name Type Description Default
full_time datetime.timedelta

timedelta time between 1 and 24 hours, seconds are ignored.

required
Return

Hexadecimal representation of the seconds built fom the full_time argument.

Source code in src/aioswitcher/device/tools.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def timedelta_to_hexadecimal_seconds(full_time: datetime.timedelta) -> str:
    """Encode timedelta as seconds to an hexadecimal packed as little endian unsigned.

    Args:
        full_time: timedelta time between 1 and 24 hours, seconds are ignored.

    Return:
        Hexadecimal representation of the seconds built fom the full_time argument.

    """
    minutes = full_time.total_seconds() / 60
    hours, minutes = divmod(minutes, 60)
    seconds = int(hours) * 3600 + int(minutes) * 60

    if 3599 < seconds < 86341:
        return hexlify(pack("<I", int(seconds))).decode()

    raise ValueError("can only handle 1 to 24 hours")

watts_to_amps(watts) ⚓︎

Convert power consumption to watts to electric current in amps.

Source code in src/aioswitcher/device/tools.py
127
128
129
def watts_to_amps(watts: int) -> float:
    """Convert power consumption to watts to electric current in amps."""
    return round((watts / float(220)), 1)

Switcher integration schedule module.

Days ⚓︎

Bases: Enum

Enum class representing the day entity.

Source code in src/aioswitcher/schedule/__init__.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@unique
class Days(Enum):
    """Enum class representing the day entity."""

    MONDAY = ("Monday", 0x02, 2, 0)
    TUESDAY = ("Tuesday", 0x04, 4, 1)
    WEDNESDAY = ("Wednesday", 0x08, 8, 2)
    THURSDAY = ("Thursday", 0x10, 16, 3)
    FRIDAY = ("Friday", 0x20, 32, 4)
    SATURDAY = ("Saturday", 0x40, 64, 5)
    SUNDAY = ("Sunday", 0x80, 128, 6)

    def __new__(cls, value: str, hex_rep: int, bit_rep: int, weekday: int) -> "Days":
        """Override the default enum constructor and include extra properties."""
        new_enum = object.__new__(cls)
        new_enum._value_ = value
        new_enum._hex_rep = hex_rep  # type: ignore
        new_enum._bit_rep = bit_rep  # type: ignore
        new_enum._weekday = weekday  # type: ignore
        return new_enum

    @property
    def bit_rep(self) -> int:
        """Return the bit representation of the day."""
        return self._bit_rep  # type: ignore

    @property
    def hex_rep(self) -> int:
        """Return the hexadecimal representation of the day."""
        return self._hex_rep  # type: ignore

    @property
    def weekday(self) -> int:
        """Return the weekday of the day."""
        return self._weekday  # type: ignore

bit_rep: int property ⚓︎

Return the bit representation of the day.

hex_rep: int property ⚓︎

Return the hexadecimal representation of the day.

weekday: int property ⚓︎

Return the weekday of the day.

__new__(value, hex_rep, bit_rep, weekday) ⚓︎

Override the default enum constructor and include extra properties.

Source code in src/aioswitcher/schedule/__init__.py
40
41
42
43
44
45
46
47
def __new__(cls, value: str, hex_rep: int, bit_rep: int, weekday: int) -> "Days":
    """Override the default enum constructor and include extra properties."""
    new_enum = object.__new__(cls)
    new_enum._value_ = value
    new_enum._hex_rep = hex_rep  # type: ignore
    new_enum._bit_rep = bit_rep  # type: ignore
    new_enum._weekday = weekday  # type: ignore
    return new_enum

ScheduleState ⚓︎

Bases: Enum

Enum representing the status of the schedule.

Source code in src/aioswitcher/schedule/__init__.py
20
21
22
23
24
25
@unique
class ScheduleState(Enum):
    """Enum representing the status of the schedule."""

    ENABLED = "01"
    DISABLED = "00"

Switcher integration schedule parser module.

ScheduleParser dataclass ⚓︎

Schedule parsing tool.

Source code in src/aioswitcher/schedule/parser.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
@final
@dataclass(frozen=True)
class ScheduleParser:
    """Schedule parsing tool."""

    schedule: bytes

    def get_id(self) -> str:
        """Return the id of the schedule."""
        return str(int(self.schedule[0:2], 16))

    def is_enabled(self) -> bool:
        """Return true if enbaled."""
        return int(self.schedule[2:4], 16) == 1

    def is_recurring(self) -> bool:
        """Return true if a recurring schedule."""
        return self.schedule[4:6] != b"00"

    def get_days(self) -> Set[Days]:
        """Retun a set of the scheduled Days."""
        return (
            tools.bit_summary_to_days(int(self.schedule[4:6], 16))
            if self.is_recurring()
            else set()
        )

    def get_state(self) -> ScheduleState:
        """Return the current state of the device.

        Not sure if this needs to be included in the schedule object.
        """
        return ScheduleState(self.schedule[6:8].decode())

    def get_start_time(self) -> str:
        """Return the schedule start time in %H:%M format."""
        return tools.hexadecimale_timestamp_to_localtime(self.schedule[8:16])

    def get_end_time(self) -> str:
        """Return the schedule end time in %H:%M format."""
        return tools.hexadecimale_timestamp_to_localtime(self.schedule[16:24])

get_days() ⚓︎

Retun a set of the scheduled Days.

Source code in src/aioswitcher/schedule/parser.py
82
83
84
85
86
87
88
def get_days(self) -> Set[Days]:
    """Retun a set of the scheduled Days."""
    return (
        tools.bit_summary_to_days(int(self.schedule[4:6], 16))
        if self.is_recurring()
        else set()
    )

get_end_time() ⚓︎

Return the schedule end time in %H:%M format.

Source code in src/aioswitcher/schedule/parser.py
101
102
103
def get_end_time(self) -> str:
    """Return the schedule end time in %H:%M format."""
    return tools.hexadecimale_timestamp_to_localtime(self.schedule[16:24])

get_id() ⚓︎

Return the id of the schedule.

Source code in src/aioswitcher/schedule/parser.py
70
71
72
def get_id(self) -> str:
    """Return the id of the schedule."""
    return str(int(self.schedule[0:2], 16))

get_start_time() ⚓︎

Return the schedule start time in %H:%M format.

Source code in src/aioswitcher/schedule/parser.py
97
98
99
def get_start_time(self) -> str:
    """Return the schedule start time in %H:%M format."""
    return tools.hexadecimale_timestamp_to_localtime(self.schedule[8:16])

get_state() ⚓︎

Return the current state of the device.

Not sure if this needs to be included in the schedule object.

Source code in src/aioswitcher/schedule/parser.py
90
91
92
93
94
95
def get_state(self) -> ScheduleState:
    """Return the current state of the device.

    Not sure if this needs to be included in the schedule object.
    """
    return ScheduleState(self.schedule[6:8].decode())

is_enabled() ⚓︎

Return true if enbaled.

Source code in src/aioswitcher/schedule/parser.py
74
75
76
def is_enabled(self) -> bool:
    """Return true if enbaled."""
    return int(self.schedule[2:4], 16) == 1

is_recurring() ⚓︎

Return true if a recurring schedule.

Source code in src/aioswitcher/schedule/parser.py
78
79
80
def is_recurring(self) -> bool:
    """Return true if a recurring schedule."""
    return self.schedule[4:6] != b"00"

SwitcherSchedule dataclass ⚓︎

representation of the Switcher schedule slot.

Parameters:

Name Type Description Default
schedule_id str

the id of the schedule

required
recurring bool

is a recurring schedule

required
days Set[Days]

a set of schedule days, or empty set for non recurring schedules

required
start_time str

the start time of the schedule

required
end_time str

the end time of the schedule

required
Source code in src/aioswitcher/schedule/parser.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@final
@dataclass
class SwitcherSchedule:
    """representation of the Switcher schedule slot.

    Args:
        schedule_id: the id of the schedule
        recurring: is a recurring schedule
        days: a set of schedule days, or empty set for non recurring schedules
        start_time: the start time of the schedule
        end_time: the end time of the schedule

    """

    schedule_id: str
    recurring: bool
    days: Set[Days]
    start_time: str
    end_time: str
    duration: str = field(init=False)
    display: str = field(init=False)

    def __post_init__(self) -> None:
        """Post initialization, set duration and display."""
        self.duration = tools.calc_duration(self.start_time, self.end_time)
        self.display = tools.pretty_next_run(self.start_time, self.days)

    def __hash__(self) -> int:
        """For usage with set, implementation of the __hash__ magic method."""
        return hash(self.schedule_id)

    def __eq__(self, obj: object) -> bool:
        """For usage with set, implementation of the __eq__ magic method."""
        if isinstance(obj, SwitcherSchedule):
            return self.schedule_id == obj.schedule_id
        return False

__eq__(obj) ⚓︎

For usage with set, implementation of the eq magic method.

Source code in src/aioswitcher/schedule/parser.py
56
57
58
59
60
def __eq__(self, obj: object) -> bool:
    """For usage with set, implementation of the __eq__ magic method."""
    if isinstance(obj, SwitcherSchedule):
        return self.schedule_id == obj.schedule_id
    return False

__hash__() ⚓︎

For usage with set, implementation of the hash magic method.

Source code in src/aioswitcher/schedule/parser.py
52
53
54
def __hash__(self) -> int:
    """For usage with set, implementation of the __hash__ magic method."""
    return hash(self.schedule_id)

__post_init__() ⚓︎

Post initialization, set duration and display.

Source code in src/aioswitcher/schedule/parser.py
47
48
49
50
def __post_init__(self) -> None:
    """Post initialization, set duration and display."""
    self.duration = tools.calc_duration(self.start_time, self.end_time)
    self.display = tools.pretty_next_run(self.start_time, self.days)

get_schedules(message) ⚓︎

Use to create a list of schedule from a response message from the device.

Source code in src/aioswitcher/schedule/parser.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def get_schedules(message: bytes) -> Set[SwitcherSchedule]:
    """Use to create a list of schedule from a response message from the device."""
    hex_data = hexlify(message)[90:-8].decode()
    hex_data_split = wrap(hex_data, 32)
    ret_set = set()
    for schedule in hex_data_split:
        parser = ScheduleParser(schedule.encode())
        ret_set.add(
            SwitcherSchedule(
                parser.get_id(),
                parser.is_recurring(),
                parser.get_days(),
                parser.get_start_time(),
                parser.get_end_time(),
            )
        )
    return ret_set

Switcher integration schedule module tools.

bit_summary_to_days(sum_weekdays_bit) ⚓︎

Decode a weekdays bit summary to a set of weekdays.

Parameters:

Name Type Description Default
sum_weekdays_bit int

the sum of all weekdays

required
Return

Set of Weekday members decoded from the summary value.

Todo

Should an existing remainder in the sum value throw an error? E.g. 3 will result in a set of MONDAY and the remainder will be 1.

Source code in src/aioswitcher/schedule/tools.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def bit_summary_to_days(sum_weekdays_bit: int) -> Set[Days]:
    """Decode a weekdays bit summary to a set of weekdays.

    Args:
        sum_weekdays_bit: the sum of all weekdays

    Return:
        Set of Weekday members decoded from the summary value.

    Todo:
        Should an existing remainder in the sum value throw an error?
        E.g. 3 will result in a set of MONDAY and the remainder will be 1.

    """
    if 1 < sum_weekdays_bit < 255:
        return_weekdays = set()
        weekdays_by_hex = map(lambda w: (w.hex_rep, w), Days)
        for weekday_hex in weekdays_by_hex:
            if weekday_hex[0] & sum_weekdays_bit != 0:
                return_weekdays.add(weekday_hex[1])
        return return_weekdays
    raise ValueError("weekdays bit sum should be between 2 and 254")

calc_duration(start_time, end_time) ⚓︎

Use to calculate the delta between two time values formated as %H:%M.

Source code in src/aioswitcher/schedule/tools.py
72
73
74
75
76
77
78
def calc_duration(start_time: str, end_time: str) -> str:
    """Use to calculate the delta between two time values formated as %H:%M."""
    start_datetime = datetime.strptime(start_time, "%H:%M")
    end_datetime = datetime.strptime(end_time, "%H:%M")
    if end_datetime > start_datetime:
        return str(end_datetime - start_datetime)
    raise ValueError("end_time should be greater the start_time")

hexadecimale_timestamp_to_localtime(hex_timestamp) ⚓︎

Decode an hexadecimale timestamp to localtime with the format %H:%M.

Parameters:

Name Type Description Default
hex_timestamp bytes

the hexadecimale timestamp.

required
Return

Localtime string with %H:%M format. e.g. "20:30".

Source code in src/aioswitcher/schedule/tools.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def hexadecimale_timestamp_to_localtime(hex_timestamp: bytes) -> str:
    """Decode an hexadecimale timestamp to localtime with the format %H:%M.

    Args:
        hex_timestamp: the hexadecimale timestamp.

    Return:
        Localtime string with %H:%M format. e.g. "20:30".
    """
    hex_time = (
        hex_timestamp[6:8]
        + hex_timestamp[4:6]
        + hex_timestamp[2:4]
        + hex_timestamp[0:2]
    )
    int_time = int(hex_time, 16)
    local_time = time.localtime(int_time)
    return time.strftime("%H:%M", local_time)

pretty_next_run(start_time, days=set()) ⚓︎

Create a literal for displaying the next run time.

Parameters:

Name Type Description Default
start_time str

the start of the schedule in "%H:%M" format, e.g. "17:00".

required
days Set[Days]

for recurring schedules, a list of days when none, will be today.

set()

Returns:

Type Description
str

A pretty string describing the next due run.

str

e.g. "Due next Sunday at 17:00".

Source code in src/aioswitcher/schedule/tools.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def pretty_next_run(start_time: str, days: Set[Days] = set()) -> str:
    """Create a literal for displaying the next run time.

    Args:
        start_time: the start of the schedule in "%H:%M" format, e.g. "17:00".
        days: for recurring schedules, a list of days when none, will be today.

    Returns:
        A pretty string describing the next due run.
        e.g. "Due next Sunday at 17:00".

    """
    if not days:
        return f"Due today at {start_time}"

    current_datetime = datetime.utcnow()
    current_weekday = current_datetime.weekday()

    current_time = datetime.strptime(
        current_datetime.time().strftime("%H:%M"), "%H:%M"
    ).time()
    schedule_time = datetime.strptime(start_time, "%H:%M").time()

    execution_days = [d.weekday for d in days]
    # if scheduled for later on today, return "due today"
    if current_weekday in execution_days and current_time < schedule_time:
        return f"Due today at {start_time}"

    execution_days.sort()
    if current_weekday > execution_days[-1]:
        next_exc_day = execution_days[0]
    else:
        next_exc_day = list(filter(lambda d: d >= current_weekday, execution_days))[0]

    # if next excution day is tomorrow for the current day, or this is the week end
    # (today is sunday and tomorrow is monday)  return "due tomorrow"
    if next_exc_day - 1 == current_weekday or (
        next_exc_day == Days.MONDAY.weekday and current_weekday == Days.SUNDAY.weekday
    ):
        return f"Due tomorrow at {start_time}"

    # if here, then the scuedle is due some other day this week, return "due at..."
    weekdays = dict(map(lambda d: (d.weekday, d), Days))
    return f"Due next {weekdays[next_exc_day].value} at {start_time}"

time_to_hexadecimal_timestamp(time_value) ⚓︎

Convert hours and minutes to a timestamp with the current date and encode.

Parameters:

Name Type Description Default
time_value str

time to convert. e.g. "21:00".

required
Return

Hexadecimal representation of the timestamp.

Source code in src/aioswitcher/schedule/tools.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def time_to_hexadecimal_timestamp(time_value: str) -> str:
    """Convert hours and minutes to a timestamp with the current date and encode.

    Args:
        time_value: time to convert. e.g. "21:00".

    Return:
        Hexadecimal representation of the timestamp.

    """
    tsplit = time_value.split(":")
    str_timedate = time.strftime("%d/%m/%Y") + " " + tsplit[0] + ":" + tsplit[1]
    struct_timedate = time.strptime(str_timedate, "%d/%m/%Y %H:%M")
    timestamp = time.mktime(struct_timedate)
    binary_timestamp = pack("<I", int(timestamp))

    return hexlify(binary_timestamp).decode()

weekdays_to_hexadecimal(days) ⚓︎

Sum the requested weekdays bit representation and return as hexadecimal value.

Parameters:

Name Type Description Default
days Union[Days, Set[Days]]

the requested Weekday members.

required
Return

Hexadecimale representation of the sum of all requested days.

Source code in src/aioswitcher/schedule/tools.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def weekdays_to_hexadecimal(days: Union[Days, Set[Days]]) -> str:
    """Sum the requested weekdays bit representation and return as hexadecimal value.

    Args:
        days: the requested Weekday members.

    Return:
        Hexadecimale representation of the sum of all requested days.

    """
    if days:
        if type(days) is Days:
            return "{:02x}".format(days.bit_rep)
        elif type(days) is set or len(days) == len(set(days)):  # type: ignore
            map_to_bits = map(lambda w: w.bit_rep, days)  # type: ignore
            return "{:02x}".format(int(sum(map_to_bits)))
    raise ValueError("no days requested")

Last update: 2024-01-18