Skip to content

Code documentation⚓︎

Switcher integration main module.

Modules:

Name Description
api

Switcher integration TCP socket API module.

bridge

Switcher integration, UDP Bridge module.

device

Switcher integration device module.

schedule

Switcher integration schedule module.

Switcher integration TCP socket API module.

Modules:

Name Description
messages

Switcher integration TCP socket API messages.

packets

Switcher integration TCP socket API packet formats.

remotes

Switcher integration API remote related classes and functions.

Command ⚓︎

Bases: Enum

Enum for turning the device on or off.

Source code in src/aioswitcher/api/__init__.py
@unique
class Command(Enum):
    """Enum for turning the device on or off."""

    ON = "1"
    OFF = "0"

SwitcherApi ⚓︎

Switcher TCP based API.

Parameters:

Name Type Description Default
device_type DeviceType

the type of the device.

required
ip_address str

the ip address assigned to the device.

required
device_id str

the id of the desired device.

required
device_key str

the login key of the device.

required

Methods:

Name Description
connect

Connect to asynchronous socket and get reader and writer object.

control_breeze_device

Use for sending the control packet to the Breeze device.

control_device

Use for sending the control packet to the device.

create_schedule

Use for creating a new schedule in the next empty schedule slot.

delete_schedule

Use for deleting a schedule from the device.

disconnect

Disconnect from asynchronous socket.

get_breeze_state

Use for sending the get state packet to the Breeze device.

get_light_state

Use for sending the get state packet to the Light devices.

get_schedules

Use for retrieval of the schedules from the device.

get_shutter_state

Use for sending the get state packet to the Runners devices.

get_state

Use for sending the get state packet to the device.

set_auto_shutdown

Use for sending the set auto-off packet to the device.

set_device_name

Use for sending the set name packet to the device.

set_light

Use for turn on/off light.

set_position

Use for setting the shutter position of the Runners devices.

set_shutter_child_lock

Use for turn on/off shutter child lock.

stop_shutter

Use for stopping the shutter.

Source code in src/aioswitcher/api/__init__.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
@final
class SwitcherApi:
    """Switcher TCP based API.

    Args:
        device_type: the type of the device.
        ip_address: the ip address assigned to the device.
        device_id: the id of the desired device.
        device_key: the login key of the device.

    """

    def __init__(
        self,
        device_type: DeviceType,
        ip_address: str,
        device_id: str,
        device_key: str,
        token: Union[str, None] = None,
    ) -> None:
        """Initialize the Switcher TCP connection API."""
        self._device_type = device_type
        self._ip_address = ip_address
        self._device_id = device_id
        self._device_key = device_key
        self._port = SWITCHER_TCP_PORT_TYPE1
        if device_type.protocol_type == 2:
            self._port = SWITCHER_TCP_PORT_TYPE2
        self._connected = False
        self._token = None
        if self._device_type.token_needed:
            if not token:
                raise RuntimeError("A token is needed but is missing")
            self._token = convert_token_to_packet(str(token))

    @property
    def connected(self) -> bool:
        """Return true if api is connected."""
        return self._connected

    async def __aenter__(self) -> "SwitcherApi":
        """Enter SwitcherApi asynchronous context manager.

        Returns:
            This instance of ``aioswitcher.api.SwitcherApi``.

        """
        await self.connect()
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        """Exit SwitcherApi asynchronous context manager."""
        await self.disconnect()

    async def connect(self) -> None:
        """Connect to asynchronous socket and get reader and writer object."""
        logger.info("connecting to the switcher device")
        self._reader, self._writer = await open_connection(
            host=self._ip_address,
            port=self._port,
            family=AF_INET,
        )

        self._connected = True
        logger.info("switcher device connected")

    async def disconnect(self) -> None:
        """Disconnect from asynchronous socket."""
        if hasattr(self, "_writer") and self._writer:
            logger.info("disconnecting from the switcher device")
            self._writer.close()
            await self._writer.wait_closed()
        else:
            logger.info("switcher device not connected")
        self._connected = False

    async def _send_packet(self, packet_id: str, packet: str) -> bytes:
        """Sign and send a packet, then read the response.

        Args:
            packet_id (str): The identifier for the packet being sent.
            packet (str): The packet to be sent.

        Returns:
            bytes: The response from the device.
        """
        packet = set_message_length(packet)
        signed_packet = sign_packet_with_crc_key(packet)

        logger.debug(f"sending a {packet_id} packet")
        self._writer.write(unhexlify(signed_packet))
        response = await self._reader.read(1024)
        return response

    async def _login(self) -> Tuple[str, SwitcherLoginResponse]:
        """Use for sending the login packet to the device.

        Returns:
            A tuple of the hex timestamp and an instance of ``SwitcherLoginResponse``.

        Note:
            This is a private function used by other functions, do not call this
            function directly.

        """
        timestamp = current_timestamp_to_hexadecimal()
        if bool(self._token):
            packet = packets.LOGIN_TOKEN_PACKET_TYPE2.format(
                self._token, timestamp, self._device_id
            )
        elif (
            self._device_type == DeviceType.BREEZE
            or self._device_type == DeviceType.RUNNER
            or self._device_type == DeviceType.RUNNER_MINI
        ):
            packet = packets.LOGIN_PACKET_TYPE2.format(timestamp, self._device_id)
        else:
            packet = packets.LOGIN_PACKET_TYPE1.format(timestamp, self._device_key)

        response = await self._send_packet("login", packet)

        if bool(self._token):
            packet = packets.LOGIN2_TOKEN_PACKET_TYPE2.format(
                self._device_id, timestamp, self._token
            )
            response = await self._send_packet("login2", packet)
        return timestamp, SwitcherLoginResponse(response)

    async def get_state(self) -> SwitcherStateResponse:
        """Use for sending the get state packet to the device.

        Returns:
            An instance of ``SwitcherStateResponse``.

        """
        timestamp, login_resp = await self._login()
        if login_resp.successful:
            packet = packets.GET_STATE_PACKET_TYPE1.format(
                login_resp.session_id, timestamp, self._device_id
            )
            state_resp = await self._send_packet("get state", packet)
            try:
                response = SwitcherStateResponse(state_resp)
                if response.successful:
                    return response
            except (KeyError, ValueError) as ve:
                raise RuntimeError("get state request was not successful") from ve
        raise RuntimeError("login request was not successful")

    async def control_device(
        self, command: Command, minutes: int = 0
    ) -> SwitcherBaseResponse:
        """Use for sending the control packet to the device.

        Args:
            command: use the ``aioswitcher.api.Command`` enum.
            minutes: if turning-on optionally incorporate a timer.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        timestamp, login_resp = await self._login()
        timer = (
            minutes_to_hexadecimal_seconds(minutes)
            if minutes > 0
            else packets.NO_TIMER_REQUESTED
        )
        packet = packets.SEND_CONTROL_PACKET.format(
            login_resp.session_id,
            timestamp,
            self._device_id,
            command.value,
            timer,
        )
        response = await self._send_packet("control", packet)
        return SwitcherBaseResponse(response)

    async def set_auto_shutdown(self, full_time: timedelta) -> SwitcherBaseResponse:
        """Use for sending the set auto-off packet to the device.

        Args:
            full_time: timedelta value containing the configuration value for
                auto-shutdown.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        timestamp, login_resp = await self._login()
        auto_shutdown = timedelta_to_hexadecimal_seconds(full_time)
        packet = packets.SET_AUTO_OFF_SET_PACKET.format(
            login_resp.session_id,
            timestamp,
            self._device_id,
            auto_shutdown,
        )
        response = await self._send_packet("set auto shutdown", packet)
        return SwitcherBaseResponse(response)

    async def set_device_name(self, name: str) -> SwitcherBaseResponse:
        """Use for sending the set name packet to the device.

        Args:
            name: string name with the length of 2 >= x >= 32.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        timestamp, login_resp = await self._login()
        device_name = string_to_hexadecimale_device_name(name)
        packet = packets.UPDATE_DEVICE_NAME_PACKET.format(
            login_resp.session_id,
            timestamp,
            self._device_id,
            device_name,
        )
        response = await self._send_packet("set name", packet)
        return SwitcherBaseResponse(response)

    async def get_schedules(self) -> SwitcherGetSchedulesResponse:
        """Use for retrieval of the schedules from the device.

        Returns:
            An instance of ``SwitcherGetSchedulesResponse``.

        """
        timestamp, login_resp = await self._login()
        packet = packets.GET_SCHEDULES_PACKET.format(
            login_resp.session_id,
            timestamp,
            self._device_id,
        )
        response = await self._send_packet("get schedules", packet)
        return SwitcherGetSchedulesResponse(response)

    async def delete_schedule(self, schedule_id: str) -> SwitcherBaseResponse:
        """Use for deleting a schedule from the device.

        Use ``get_schedules`` to retrieve the schedule instance.

        Args:
            schedule_id: the identification of the schedule for deletion.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        timestamp, login_resp = await self._login()
        packet = packets.DELETE_SCHEDULE_PACKET.format(
            login_resp.session_id, timestamp, self._device_id, schedule_id
        )
        response = await self._send_packet("delete schedule", packet)
        return SwitcherBaseResponse(response)

    async def create_schedule(
        self, start_time: str, end_time: str, days: Set[Days] = set()
    ) -> SwitcherBaseResponse:
        """Use for creating a new schedule in the next empty schedule slot.

        Args:
            start_time: a string start time in %H:%M format. e.g. 13:00.
            end_time: a string start time in %H:%M format. e.g. 13:00.
            days: for recurring schedules, add ``Days``.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        timestamp, login_resp = await self._login()

        start_time_hex = time_to_hexadecimal_timestamp(start_time)
        end_time_hex = time_to_hexadecimal_timestamp(end_time)
        weekdays = (
            weekdays_to_hexadecimal(days)
            if len(days) > 0
            else packets.NON_RECURRING_SCHEDULE
        )
        new_schedule = packets.SCHEDULE_CREATE_DATA_FORMAT.format(
            weekdays, start_time_hex, end_time_hex
        )
        packet = packets.CREATE_SCHEDULE_PACKET.format(
            login_resp.session_id,
            timestamp,
            self._device_id,
            new_schedule,
        )
        response = await self._send_packet("create schedule", packet)
        return SwitcherBaseResponse(response)

    async def control_breeze_device(
        self,
        remote: SwitcherBreezeRemote,
        state: Union[DeviceState, None] = None,
        mode: Union[ThermostatMode, None] = None,
        target_temp: int = 0,
        fan_level: Union[ThermostatFanLevel, None] = None,
        swing: Union[ThermostatSwing, None] = None,
        update_state: bool = False,
    ) -> SwitcherBaseResponse:
        """Use for sending the control packet to the Breeze device.

        Args:
            remote: the remote for the breeze device
            state: optionally the desired state of the device
            mode: optionally the desired mode of the device
            target_temp: optionally the target temperature
            fan_level: optionally the desired fan level
            swing: optionally the desired swing state
            update_state: update the device state without controlling the device

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        timestamp, login_resp = await self._login()
        if not login_resp.successful:
            logger.error("Failed to log into device id %s", self._device_id)
            raise RuntimeError("login request was not successful")

        logger.debug(
            "logged in session_id=%s, timestamp=%s", login_resp.session_id, timestamp
        )

        cmd_response: Union[SwitcherBaseResponse, None] = None
        if (
            state
            or mode
            or target_temp
            or fan_level
            or (swing and not remote._separated_swing_command)
        ):
            current_state = await self._get_breeze_state(timestamp, login_resp)
            if not current_state.successful:
                raise RuntimeError("get state request was not successful")

            logger.debug("got current breeze device state")

            state = state or current_state.state
            mode = mode or current_state.mode
            target_temp = target_temp or current_state.target_temperature
            fan_level = fan_level or current_state.fan_level
            set_swing = swing or current_state.swing
            if remote._separated_swing_command:
                set_swing = ThermostatSwing.OFF
            if update_state:
                packet = packets.BREEZE_UPDATE_STATUS_PACKET.format(
                    login_resp.session_id,
                    timestamp,
                    self._device_id,
                    state.value,
                    mode.value,
                    target_temp,
                    fan_level.value,
                    set_swing.value,
                )
                response = await self._send_packet("set status", packet)
            else:
                command = remote.build_command(
                    state, mode, target_temp, fan_level, set_swing, current_state.state
                )

                packet = packets.BREEZE_COMMAND_PACKET.format(
                    login_resp.session_id,
                    timestamp,
                    self._device_id,
                    command.length,
                    command.command,
                )
                response = await self._send_packet("control", packet)
            cmd_response = SwitcherBaseResponse(response)

            if not cmd_response.successful:
                raise RuntimeError("set state request was not successful")

        if remote._separated_swing_command and swing and not update_state:
            # if device is SPECIAL SWING device and user requested a swing change
            cmd_response = await self._control_breeze_swing_device(
                timestamp, login_resp.session_id, remote, swing
            )

        if cmd_response:
            return cmd_response
        raise RuntimeError("control breeze device failed")

    async def _control_breeze_swing_device(
        self,
        timestamp: str,
        session_id: str,
        remote: SwitcherBreezeRemote,
        swing: ThermostatSwing,
    ) -> SwitcherBaseResponse:
        """Use for sending the control packet to the Breeze device.

        Args:
            timestamp: the timestamp from the login response
            session_id: the session_id from the login response
            remote: the remote for the breeze device
            swing: the desired swing state

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        logger.debug("about to send Breeze special swing command")
        command = remote.build_swing_command(swing)
        packet = packets.BREEZE_COMMAND_PACKET.format(
            session_id,
            timestamp,
            self._device_id,
            command.length,
            command.command,
        )

        response = await self._send_packet("control", packet)
        return SwitcherBaseResponse(response)

    async def stop_shutter(self, index: int = 0) -> SwitcherBaseResponse:
        """Use for stopping the shutter.

        Args:
            index: which runner to stop position, default to 0.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        index_packet = get_shutter_api_packet_index(self._device_type, index)
        logger.debug("about to send stop shutter command")
        timestamp, login_resp = await self._login()
        if not login_resp.successful:
            logger.error("Failed to log into device with id %s", self._device_id)
            raise RuntimeError("login request was not successful")

        logger.debug(
            "logged in session_id=%s, timestamp=%s", login_resp.session_id, timestamp
        )

        if bool(self._token):
            command = "0000"
            hex_pos = f"0{index_packet}{command}"

            packet = packets.GENERAL_TOKEN_COMMAND.format(
                timestamp,
                self._device_id,
                self._token,
                packets.STOP_SHUTTER_PRECOMMAND,
                hex_pos,
            )
        else:
            packet = packets.RUNNER_STOP_COMMAND.format(
                login_resp.session_id, timestamp, self._device_id
            )

        response = await self._send_packet("stop control", packet)
        return SwitcherBaseResponse(response)

    async def set_position(
        self, position: int = 0, index: int = 0
    ) -> SwitcherBaseResponse:
        """Use for setting the shutter position of the Runners devices.

        Args:
            position: the position to set the device to, default to 0.
            index: which runner to set position, default to 0.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        index_packet = get_shutter_api_packet_index(self._device_type, index)
        hex_pos = "{0:0{1}x}".format(position, 2)

        logger.debug("about to send set position command")
        timestamp, login_resp = await self._login()
        if not login_resp.successful:
            logger.error("Failed to log into device with id %s", self._device_id)
            raise RuntimeError("login request was not successful")

        logger.debug(
            "logged in session_id=%s, timestamp=%s", login_resp.session_id, timestamp
        )

        if bool(self._token):
            hex_pos = f"0{index_packet}{hex_pos}"

            packet = packets.GENERAL_TOKEN_COMMAND.format(
                timestamp,
                self._device_id,
                self._token,
                packets.SET_POSITION_PRECOMMAND,
                hex_pos,
            )
        else:
            packet = packets.RUNNER_SET_POSITION.format(
                login_resp.session_id, timestamp, self._device_id, hex_pos
            )

        response = await self._send_packet("control", packet)
        return SwitcherBaseResponse(response)

    async def set_shutter_child_lock(
        self, command: ShutterChildLock, index: int = 0
    ) -> SwitcherBaseResponse:
        """Use for turn on/off shutter child lock.

        Args:
            command: use the ``aioswitcher.api.ShutterChildLock`` enum.
            index: which shutter child lock to turn on/off, default to 0.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        index_packet = get_shutter_api_packet_index(self._device_type, index)
        hex_pos = command.value

        logger.debug("about to send set shutter child lock command")
        timestamp, login_resp = await self._login()
        if not login_resp.successful:
            logger.error("Failed to log into device with id %s", self._device_id)
            raise RuntimeError("login request was not successful")

        logger.debug(
            "logged in session_id=%s, timestamp=%s", login_resp.session_id, timestamp
        )

        if bool(self._token):
            hex_pos = f"0{index_packet}{command.value}"
            packet = packets.GENERAL_TOKEN_COMMAND.format(
                timestamp,
                self._device_id,
                self._token,
                packets.SET_CHILD_LOCK_PRECOMMAND,
                hex_pos,
            )
        else:
            packet = packets.RUNNER_SET_CHILD_LOCK.format(
                login_resp.session_id, timestamp, self._device_id, hex_pos
            )

        response = await self._send_packet("control", packet)
        return SwitcherBaseResponse(response)

    async def get_breeze_state(self) -> SwitcherThermostatStateResponse:
        """Use for sending the get state packet to the Breeze device.

        Returns:
            An instance of ``SwitcherThermostatStateResponse``.

        """
        timestamp, login_resp = await self._login()
        if login_resp.successful:
            return await self._get_breeze_state(timestamp, login_resp)
        raise RuntimeError("login request was not successful")

    async def _get_breeze_state(
        self, timestamp: str, login_resp: SwitcherLoginResponse
    ) -> SwitcherThermostatStateResponse:
        packet = packets.GET_STATE_PACKET2_TYPE2.format(
            login_resp.session_id, timestamp, self._device_id
        )

        state_resp = await self._send_packet("get state", packet)
        try:
            response = SwitcherThermostatStateResponse(state_resp)
            return response
        except (KeyError, ValueError) as ve:
            raise RuntimeError("get breeze state request was not successful") from ve

    async def get_shutter_state(self, index: int = 0) -> SwitcherShutterStateResponse:
        """Use for sending the get state packet to the Runners devices.

        Args:
            index: which runner to set get state, default to 0.

        Returns:
            An instance of ``SwitcherShutterStateResponse``.

        """
        timestamp, login_resp = await self._login()
        if login_resp.successful:
            packet = packets.GET_STATE_PACKET2_TYPE2.format(
                login_resp.session_id, timestamp, self._device_id
            )

            state_resp = await self._send_packet("get state", packet)
            try:
                response = SwitcherShutterStateResponse(
                    state_resp, self._device_type, index
                )
                return response
            except (KeyError, ValueError) as ve:
                raise RuntimeError(
                    "get shutter state request was not successful"
                ) from ve
        raise RuntimeError("login request was not successful")

    async def get_light_state(self, index: int = 0) -> SwitcherLightStateResponse:
        """Use for sending the get state packet to the Light devices.

        Args:
            index: which light to set get state, default to 0.

        Returns:
            An instance of ``SwitcherLightStateResponse``.

        """
        timestamp, login_resp = await self._login()
        if login_resp.successful:
            packet = packets.GET_STATE_PACKET2_TYPE2.format(
                login_resp.session_id, timestamp, self._device_id
            )

            state_resp = await self._send_packet("get state", packet)
            try:
                response = SwitcherLightStateResponse(
                    state_resp, self._device_type, index
                )
                return response
            except (KeyError, ValueError) as ve:
                raise RuntimeError("get light state request was not successful") from ve
        raise RuntimeError("login request was not successful")

    async def set_light(
        self, command: DeviceState, index: int = 0
    ) -> SwitcherBaseResponse:
        """Use for turn on/off light.

        Args:
            command: use the ``aioswitcher.api.DeviceState`` enum.
            index: which light to turn on/off, default to 0.

        Returns:
            An instance of ``SwitcherBaseResponse``.

        """
        index_packet = get_light_api_packet_index(self._device_type, index)
        hex_pos = f"0{index_packet}{command.value}"

        logger.debug("about to send set light command")
        timestamp, login_resp = await self._login()
        if not login_resp.successful:
            logger.error("Failed to log into device with id %s", self._device_id)
            raise RuntimeError("login request was not successful")

        logger.debug(
            "logged in session_id=%s, timestamp=%s", login_resp.session_id, timestamp
        )

        if bool(self._token):
            packet = packets.GENERAL_TOKEN_COMMAND.format(
                timestamp,
                self._device_id,
                self._token,
                packets.SET_LIGHT_PRECOMMAND,
                hex_pos,
            )
        else:
            logger.error("Failed to set light device with id %s", self._device_id)
            raise RuntimeError("a token is needed but missing or not valid")

        response = await self._send_packet("control", packet)
        return SwitcherBaseResponse(response)

connected: bool property ⚓︎

Return true if api is connected.

connect() async ⚓︎

Connect to asynchronous socket and get reader and writer object.

Source code in src/aioswitcher/api/__init__.py
async def connect(self) -> None:
    """Connect to asynchronous socket and get reader and writer object."""
    logger.info("connecting to the switcher device")
    self._reader, self._writer = await open_connection(
        host=self._ip_address,
        port=self._port,
        family=AF_INET,
    )

    self._connected = True
    logger.info("switcher device connected")

control_breeze_device(remote, state=None, mode=None, target_temp=0, fan_level=None, swing=None, update_state=False) async ⚓︎

Use for sending the control packet to the Breeze device.

Parameters:

Name Type Description Default
remote SwitcherBreezeRemote

the remote for the breeze device

required
state Union[DeviceState, None]

optionally the desired state of the device

None
mode Union[ThermostatMode, None]

optionally the desired mode of the device

None
target_temp int

optionally the target temperature

0
fan_level Union[ThermostatFanLevel, None]

optionally the desired fan level

None
swing Union[ThermostatSwing, None]

optionally the desired swing state

None
update_state bool

update the device state without controlling the device

False

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
async def control_breeze_device(
    self,
    remote: SwitcherBreezeRemote,
    state: Union[DeviceState, None] = None,
    mode: Union[ThermostatMode, None] = None,
    target_temp: int = 0,
    fan_level: Union[ThermostatFanLevel, None] = None,
    swing: Union[ThermostatSwing, None] = None,
    update_state: bool = False,
) -> SwitcherBaseResponse:
    """Use for sending the control packet to the Breeze device.

    Args:
        remote: the remote for the breeze device
        state: optionally the desired state of the device
        mode: optionally the desired mode of the device
        target_temp: optionally the target temperature
        fan_level: optionally the desired fan level
        swing: optionally the desired swing state
        update_state: update the device state without controlling the device

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    timestamp, login_resp = await self._login()
    if not login_resp.successful:
        logger.error("Failed to log into device id %s", self._device_id)
        raise RuntimeError("login request was not successful")

    logger.debug(
        "logged in session_id=%s, timestamp=%s", login_resp.session_id, timestamp
    )

    cmd_response: Union[SwitcherBaseResponse, None] = None
    if (
        state
        or mode
        or target_temp
        or fan_level
        or (swing and not remote._separated_swing_command)
    ):
        current_state = await self._get_breeze_state(timestamp, login_resp)
        if not current_state.successful:
            raise RuntimeError("get state request was not successful")

        logger.debug("got current breeze device state")

        state = state or current_state.state
        mode = mode or current_state.mode
        target_temp = target_temp or current_state.target_temperature
        fan_level = fan_level or current_state.fan_level
        set_swing = swing or current_state.swing
        if remote._separated_swing_command:
            set_swing = ThermostatSwing.OFF
        if update_state:
            packet = packets.BREEZE_UPDATE_STATUS_PACKET.format(
                login_resp.session_id,
                timestamp,
                self._device_id,
                state.value,
                mode.value,
                target_temp,
                fan_level.value,
                set_swing.value,
            )
            response = await self._send_packet("set status", packet)
        else:
            command = remote.build_command(
                state, mode, target_temp, fan_level, set_swing, current_state.state
            )

            packet = packets.BREEZE_COMMAND_PACKET.format(
                login_resp.session_id,
                timestamp,
                self._device_id,
                command.length,
                command.command,
            )
            response = await self._send_packet("control", packet)
        cmd_response = SwitcherBaseResponse(response)

        if not cmd_response.successful:
            raise RuntimeError("set state request was not successful")

    if remote._separated_swing_command and swing and not update_state:
        # if device is SPECIAL SWING device and user requested a swing change
        cmd_response = await self._control_breeze_swing_device(
            timestamp, login_resp.session_id, remote, swing
        )

    if cmd_response:
        return cmd_response
    raise RuntimeError("control breeze device failed")

control_device(command, minutes=0) async ⚓︎

Use for sending the control packet to the device.

Parameters:

Name Type Description Default
command Command

use the aioswitcher.api.Command enum.

required
minutes int

if turning-on optionally incorporate a timer.

0

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
async def control_device(
    self, command: Command, minutes: int = 0
) -> SwitcherBaseResponse:
    """Use for sending the control packet to the device.

    Args:
        command: use the ``aioswitcher.api.Command`` enum.
        minutes: if turning-on optionally incorporate a timer.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    timestamp, login_resp = await self._login()
    timer = (
        minutes_to_hexadecimal_seconds(minutes)
        if minutes > 0
        else packets.NO_TIMER_REQUESTED
    )
    packet = packets.SEND_CONTROL_PACKET.format(
        login_resp.session_id,
        timestamp,
        self._device_id,
        command.value,
        timer,
    )
    response = await self._send_packet("control", packet)
    return SwitcherBaseResponse(response)

create_schedule(start_time, end_time, days=set()) async ⚓︎

Use for creating a new schedule in the next empty schedule slot.

Parameters:

Name Type Description Default
start_time str

a string start time in %H:%M format. e.g. 13:00.

required
end_time str

a string start time in %H:%M format. e.g. 13:00.

required
days Set[Days]

for recurring schedules, add Days.

set()

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
async def create_schedule(
    self, start_time: str, end_time: str, days: Set[Days] = set()
) -> SwitcherBaseResponse:
    """Use for creating a new schedule in the next empty schedule slot.

    Args:
        start_time: a string start time in %H:%M format. e.g. 13:00.
        end_time: a string start time in %H:%M format. e.g. 13:00.
        days: for recurring schedules, add ``Days``.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    timestamp, login_resp = await self._login()

    start_time_hex = time_to_hexadecimal_timestamp(start_time)
    end_time_hex = time_to_hexadecimal_timestamp(end_time)
    weekdays = (
        weekdays_to_hexadecimal(days)
        if len(days) > 0
        else packets.NON_RECURRING_SCHEDULE
    )
    new_schedule = packets.SCHEDULE_CREATE_DATA_FORMAT.format(
        weekdays, start_time_hex, end_time_hex
    )
    packet = packets.CREATE_SCHEDULE_PACKET.format(
        login_resp.session_id,
        timestamp,
        self._device_id,
        new_schedule,
    )
    response = await self._send_packet("create schedule", packet)
    return SwitcherBaseResponse(response)

delete_schedule(schedule_id) async ⚓︎

Use for deleting a schedule from the device.

Use get_schedules to retrieve the schedule instance.

Parameters:

Name Type Description Default
schedule_id str

the identification of the schedule for deletion.

required

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
async def delete_schedule(self, schedule_id: str) -> SwitcherBaseResponse:
    """Use for deleting a schedule from the device.

    Use ``get_schedules`` to retrieve the schedule instance.

    Args:
        schedule_id: the identification of the schedule for deletion.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    timestamp, login_resp = await self._login()
    packet = packets.DELETE_SCHEDULE_PACKET.format(
        login_resp.session_id, timestamp, self._device_id, schedule_id
    )
    response = await self._send_packet("delete schedule", packet)
    return SwitcherBaseResponse(response)

disconnect() async ⚓︎

Disconnect from asynchronous socket.

Source code in src/aioswitcher/api/__init__.py
async def disconnect(self) -> None:
    """Disconnect from asynchronous socket."""
    if hasattr(self, "_writer") and self._writer:
        logger.info("disconnecting from the switcher device")
        self._writer.close()
        await self._writer.wait_closed()
    else:
        logger.info("switcher device not connected")
    self._connected = False

get_breeze_state() async ⚓︎

Use for sending the get state packet to the Breeze device.

Returns:

Type Description
SwitcherThermostatStateResponse

An instance of SwitcherThermostatStateResponse.

Source code in src/aioswitcher/api/__init__.py
async def get_breeze_state(self) -> SwitcherThermostatStateResponse:
    """Use for sending the get state packet to the Breeze device.

    Returns:
        An instance of ``SwitcherThermostatStateResponse``.

    """
    timestamp, login_resp = await self._login()
    if login_resp.successful:
        return await self._get_breeze_state(timestamp, login_resp)
    raise RuntimeError("login request was not successful")

get_light_state(index=0) async ⚓︎

Use for sending the get state packet to the Light devices.

Parameters:

Name Type Description Default
index int

which light to set get state, default to 0.

0

Returns:

Type Description
SwitcherLightStateResponse

An instance of SwitcherLightStateResponse.

Source code in src/aioswitcher/api/__init__.py
async def get_light_state(self, index: int = 0) -> SwitcherLightStateResponse:
    """Use for sending the get state packet to the Light devices.

    Args:
        index: which light to set get state, default to 0.

    Returns:
        An instance of ``SwitcherLightStateResponse``.

    """
    timestamp, login_resp = await self._login()
    if login_resp.successful:
        packet = packets.GET_STATE_PACKET2_TYPE2.format(
            login_resp.session_id, timestamp, self._device_id
        )

        state_resp = await self._send_packet("get state", packet)
        try:
            response = SwitcherLightStateResponse(
                state_resp, self._device_type, index
            )
            return response
        except (KeyError, ValueError) as ve:
            raise RuntimeError("get light state request was not successful") from ve
    raise RuntimeError("login request was not successful")

get_schedules() async ⚓︎

Use for retrieval of the schedules from the device.

Returns:

Type Description
SwitcherGetSchedulesResponse

An instance of SwitcherGetSchedulesResponse.

Source code in src/aioswitcher/api/__init__.py
async def get_schedules(self) -> SwitcherGetSchedulesResponse:
    """Use for retrieval of the schedules from the device.

    Returns:
        An instance of ``SwitcherGetSchedulesResponse``.

    """
    timestamp, login_resp = await self._login()
    packet = packets.GET_SCHEDULES_PACKET.format(
        login_resp.session_id,
        timestamp,
        self._device_id,
    )
    response = await self._send_packet("get schedules", packet)
    return SwitcherGetSchedulesResponse(response)

get_shutter_state(index=0) async ⚓︎

Use for sending the get state packet to the Runners devices.

Parameters:

Name Type Description Default
index int

which runner to set get state, default to 0.

0

Returns:

Type Description
SwitcherShutterStateResponse

An instance of SwitcherShutterStateResponse.

Source code in src/aioswitcher/api/__init__.py
async def get_shutter_state(self, index: int = 0) -> SwitcherShutterStateResponse:
    """Use for sending the get state packet to the Runners devices.

    Args:
        index: which runner to set get state, default to 0.

    Returns:
        An instance of ``SwitcherShutterStateResponse``.

    """
    timestamp, login_resp = await self._login()
    if login_resp.successful:
        packet = packets.GET_STATE_PACKET2_TYPE2.format(
            login_resp.session_id, timestamp, self._device_id
        )

        state_resp = await self._send_packet("get state", packet)
        try:
            response = SwitcherShutterStateResponse(
                state_resp, self._device_type, index
            )
            return response
        except (KeyError, ValueError) as ve:
            raise RuntimeError(
                "get shutter state request was not successful"
            ) from ve
    raise RuntimeError("login request was not successful")

get_state() async ⚓︎

Use for sending the get state packet to the device.

Returns:

Type Description
SwitcherStateResponse

An instance of SwitcherStateResponse.

Source code in src/aioswitcher/api/__init__.py
async def get_state(self) -> SwitcherStateResponse:
    """Use for sending the get state packet to the device.

    Returns:
        An instance of ``SwitcherStateResponse``.

    """
    timestamp, login_resp = await self._login()
    if login_resp.successful:
        packet = packets.GET_STATE_PACKET_TYPE1.format(
            login_resp.session_id, timestamp, self._device_id
        )
        state_resp = await self._send_packet("get state", packet)
        try:
            response = SwitcherStateResponse(state_resp)
            if response.successful:
                return response
        except (KeyError, ValueError) as ve:
            raise RuntimeError("get state request was not successful") from ve
    raise RuntimeError("login request was not successful")

set_auto_shutdown(full_time) async ⚓︎

Use for sending the set auto-off packet to the device.

Parameters:

Name Type Description Default
full_time timedelta

timedelta value containing the configuration value for auto-shutdown.

required

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
async def set_auto_shutdown(self, full_time: timedelta) -> SwitcherBaseResponse:
    """Use for sending the set auto-off packet to the device.

    Args:
        full_time: timedelta value containing the configuration value for
            auto-shutdown.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    timestamp, login_resp = await self._login()
    auto_shutdown = timedelta_to_hexadecimal_seconds(full_time)
    packet = packets.SET_AUTO_OFF_SET_PACKET.format(
        login_resp.session_id,
        timestamp,
        self._device_id,
        auto_shutdown,
    )
    response = await self._send_packet("set auto shutdown", packet)
    return SwitcherBaseResponse(response)

set_device_name(name) async ⚓︎

Use for sending the set name packet to the device.

Parameters:

Name Type Description Default
name str

string name with the length of 2 >= x >= 32.

required

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
async def set_device_name(self, name: str) -> SwitcherBaseResponse:
    """Use for sending the set name packet to the device.

    Args:
        name: string name with the length of 2 >= x >= 32.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    timestamp, login_resp = await self._login()
    device_name = string_to_hexadecimale_device_name(name)
    packet = packets.UPDATE_DEVICE_NAME_PACKET.format(
        login_resp.session_id,
        timestamp,
        self._device_id,
        device_name,
    )
    response = await self._send_packet("set name", packet)
    return SwitcherBaseResponse(response)

set_light(command, index=0) async ⚓︎

Use for turn on/off light.

Parameters:

Name Type Description Default
command DeviceState

use the aioswitcher.api.DeviceState enum.

required
index int

which light to turn on/off, default to 0.

0

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
async def set_light(
    self, command: DeviceState, index: int = 0
) -> SwitcherBaseResponse:
    """Use for turn on/off light.

    Args:
        command: use the ``aioswitcher.api.DeviceState`` enum.
        index: which light to turn on/off, default to 0.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    index_packet = get_light_api_packet_index(self._device_type, index)
    hex_pos = f"0{index_packet}{command.value}"

    logger.debug("about to send set light command")
    timestamp, login_resp = await self._login()
    if not login_resp.successful:
        logger.error("Failed to log into device with id %s", self._device_id)
        raise RuntimeError("login request was not successful")

    logger.debug(
        "logged in session_id=%s, timestamp=%s", login_resp.session_id, timestamp
    )

    if bool(self._token):
        packet = packets.GENERAL_TOKEN_COMMAND.format(
            timestamp,
            self._device_id,
            self._token,
            packets.SET_LIGHT_PRECOMMAND,
            hex_pos,
        )
    else:
        logger.error("Failed to set light device with id %s", self._device_id)
        raise RuntimeError("a token is needed but missing or not valid")

    response = await self._send_packet("control", packet)
    return SwitcherBaseResponse(response)

set_position(position=0, index=0) async ⚓︎

Use for setting the shutter position of the Runners devices.

Parameters:

Name Type Description Default
position int

the position to set the device to, default to 0.

0
index int

which runner to set position, default to 0.

0

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
async def set_position(
    self, position: int = 0, index: int = 0
) -> SwitcherBaseResponse:
    """Use for setting the shutter position of the Runners devices.

    Args:
        position: the position to set the device to, default to 0.
        index: which runner to set position, default to 0.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    index_packet = get_shutter_api_packet_index(self._device_type, index)
    hex_pos = "{0:0{1}x}".format(position, 2)

    logger.debug("about to send set position command")
    timestamp, login_resp = await self._login()
    if not login_resp.successful:
        logger.error("Failed to log into device with id %s", self._device_id)
        raise RuntimeError("login request was not successful")

    logger.debug(
        "logged in session_id=%s, timestamp=%s", login_resp.session_id, timestamp
    )

    if bool(self._token):
        hex_pos = f"0{index_packet}{hex_pos}"

        packet = packets.GENERAL_TOKEN_COMMAND.format(
            timestamp,
            self._device_id,
            self._token,
            packets.SET_POSITION_PRECOMMAND,
            hex_pos,
        )
    else:
        packet = packets.RUNNER_SET_POSITION.format(
            login_resp.session_id, timestamp, self._device_id, hex_pos
        )

    response = await self._send_packet("control", packet)
    return SwitcherBaseResponse(response)

set_shutter_child_lock(command, index=0) async ⚓︎

Use for turn on/off shutter child lock.

Parameters:

Name Type Description Default
command ShutterChildLock

use the aioswitcher.api.ShutterChildLock enum.

required
index int

which shutter child lock to turn on/off, default to 0.

0

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
async def set_shutter_child_lock(
    self, command: ShutterChildLock, index: int = 0
) -> SwitcherBaseResponse:
    """Use for turn on/off shutter child lock.

    Args:
        command: use the ``aioswitcher.api.ShutterChildLock`` enum.
        index: which shutter child lock to turn on/off, default to 0.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    index_packet = get_shutter_api_packet_index(self._device_type, index)
    hex_pos = command.value

    logger.debug("about to send set shutter child lock command")
    timestamp, login_resp = await self._login()
    if not login_resp.successful:
        logger.error("Failed to log into device with id %s", self._device_id)
        raise RuntimeError("login request was not successful")

    logger.debug(
        "logged in session_id=%s, timestamp=%s", login_resp.session_id, timestamp
    )

    if bool(self._token):
        hex_pos = f"0{index_packet}{command.value}"
        packet = packets.GENERAL_TOKEN_COMMAND.format(
            timestamp,
            self._device_id,
            self._token,
            packets.SET_CHILD_LOCK_PRECOMMAND,
            hex_pos,
        )
    else:
        packet = packets.RUNNER_SET_CHILD_LOCK.format(
            login_resp.session_id, timestamp, self._device_id, hex_pos
        )

    response = await self._send_packet("control", packet)
    return SwitcherBaseResponse(response)

stop_shutter(index=0) async ⚓︎

Use for stopping the shutter.

Parameters:

Name Type Description Default
index int

which runner to stop position, default to 0.

0

Returns:

Type Description
SwitcherBaseResponse

An instance of SwitcherBaseResponse.

Source code in src/aioswitcher/api/__init__.py
async def stop_shutter(self, index: int = 0) -> SwitcherBaseResponse:
    """Use for stopping the shutter.

    Args:
        index: which runner to stop position, default to 0.

    Returns:
        An instance of ``SwitcherBaseResponse``.

    """
    index_packet = get_shutter_api_packet_index(self._device_type, index)
    logger.debug("about to send stop shutter command")
    timestamp, login_resp = await self._login()
    if not login_resp.successful:
        logger.error("Failed to log into device with id %s", self._device_id)
        raise RuntimeError("login request was not successful")

    logger.debug(
        "logged in session_id=%s, timestamp=%s", login_resp.session_id, timestamp
    )

    if bool(self._token):
        command = "0000"
        hex_pos = f"0{index_packet}{command}"

        packet = packets.GENERAL_TOKEN_COMMAND.format(
            timestamp,
            self._device_id,
            self._token,
            packets.STOP_SHUTTER_PRECOMMAND,
            hex_pos,
        )
    else:
        packet = packets.RUNNER_STOP_COMMAND.format(
            login_resp.session_id, timestamp, self._device_id
        )

    response = await self._send_packet("stop control", packet)
    return SwitcherBaseResponse(response)

Switcher integration TCP socket API messages.

StateMessageParser dataclass ⚓︎

Use for parsing api messages.

Methods:

Name Description
get_auto_shutdown

Return the value of the auto shutdown configuration.

get_light_state

Return the current light state.

get_power_consumption

Return the current power consumption of the device.

get_shutter_child_lock

Return the current shutter child lock.

get_shutter_direction

Return the current shutter direction.

get_shutter_position

Return the current shutter position.

get_state

Return the current device state.

get_thermostat_fan_level

Return the current thermostat fan level.

get_thermostat_mode

Return the current thermostat mode.

get_thermostat_remote_id

Return the current thermostat remote.

get_thermostat_state

Return the current thermostat state.

get_thermostat_swing

Return the current thermostat fan swing.

get_thermostat_target_temp

Return the current temperature of the thermostat.

get_thermostat_temp

Return the current temp of the thermostat.

get_time_left

Return the time left for the device current run.

get_time_on

Return how long the device has been on.

Source code in src/aioswitcher/api/messages.py
@final
@dataclass
class StateMessageParser:
    """Use for parsing api messages."""

    response: InitVar[bytes]

    def __post_init__(self, response: bytes) -> None:
        """Post initialization of the parser."""
        self._hex_response = hexlify(response)

    def get_power_consumption(self) -> int:
        """Return the current power consumption of the device."""
        hex_power = self._hex_response[154:162]
        return int(hex_power[2:4] + hex_power[0:2], 16)

    def get_time_left(self) -> str:
        """Return the time left for the device current run."""
        hex_time_left = self._hex_response[178:186]
        time_left_seconds = int(
            hex_time_left[6:8]
            + hex_time_left[4:6]
            + hex_time_left[2:4]
            + hex_time_left[0:2],
            16,
        )
        return seconds_to_iso_time(time_left_seconds)

    def get_time_on(self) -> str:
        """Return how long the device has been on."""
        hex_time_on = self._hex_response[186:194]
        time_on_seconds = int(
            hex_time_on[6:8] + hex_time_on[4:6] + hex_time_on[2:4] + hex_time_on[0:2],
            16,
        )
        return seconds_to_iso_time(time_on_seconds)

    def get_auto_shutdown(self) -> str:
        """Return the value of the auto shutdown configuration."""
        hex_auto_off = self._hex_response[194:202]
        auto_off_seconds = int(
            hex_auto_off[6:8]
            + hex_auto_off[4:6]
            + hex_auto_off[2:4]
            + hex_auto_off[0:2],
            16,
        )
        return seconds_to_iso_time(auto_off_seconds)

    def get_state(self) -> DeviceState:
        """Return the current device state."""
        hex_state = self._hex_response[150:152].decode()
        states = dict(map(lambda s: (s.value, s), DeviceState))
        return states[hex_state]

    def get_thermostat_state(self) -> DeviceState:
        """Return the current thermostat state."""
        hex_power = self._hex_response[156:158].decode()
        return DeviceState.OFF if hex_power == DeviceState.OFF.value else DeviceState.ON

    def get_thermostat_mode(self) -> ThermostatMode:
        """Return the current thermostat mode."""
        hex_mode = self._hex_response[158:160]
        modes = dict(map(lambda s: (s.value, s), ThermostatMode))
        try:
            return modes[hex_mode.decode()]
        except KeyError:
            return ThermostatMode.COOL

    def get_thermostat_temp(self) -> float:
        """Return the current temp of the thermostat."""
        return int(self._hex_response[154:156] + self._hex_response[152:154], 16) / 10

    def get_thermostat_target_temp(self) -> int:
        """Return the current temperature of the thermostat."""
        hex_temp = self._hex_response[160:162]
        return int(hex_temp, 16)

    def get_thermostat_fan_level(self) -> ThermostatFanLevel:
        """Return the current thermostat fan level."""
        hex_level = self._hex_response[162:163].decode()
        levels = dict(map(lambda s: (s.value, s), ThermostatFanLevel))
        try:
            return levels[hex_level]
        except KeyError:
            return ThermostatFanLevel.LOW

    def get_thermostat_swing(self) -> ThermostatSwing:
        """Return the current thermostat fan swing."""
        hex_swing = self._hex_response[163:164].decode()
        return (
            ThermostatSwing.OFF
            if hex_swing == ThermostatSwing.OFF.value
            else ThermostatSwing.ON
        )

    def get_thermostat_remote_id(self) -> str:
        """Return the current thermostat remote."""
        remote_hex = unhexlify(self._hex_response)
        return remote_hex[84:92].decode().rstrip("\x00")

    def get_shutter_position(self, index: int) -> int:
        """Return the current shutter position."""
        start_index = 152 + (index * 32)
        end_index = start_index + 2
        hex_pos = self._hex_response[start_index:end_index].decode()
        return int(hex_pos, 16)

    def get_shutter_direction(self, index: int) -> ShutterDirection:
        """Return the current shutter direction."""
        start_index = 156 + (index * 32)
        end_index = start_index + 4
        hex_dir = self._hex_response[start_index:end_index].decode()
        directions = dict(map(lambda s: (s.value, s), ShutterDirection))
        return directions[hex_dir]

    def get_shutter_child_lock(self, index: int) -> ShutterChildLock:
        """Return the current shutter child lock."""
        start_index = 154 + (index * 32)
        end_index = start_index + 2
        hex_pos = self._hex_response[start_index:end_index].decode()
        hex_device_state = hex_pos[0:2]
        return (
            ShutterChildLock.ON
            if hex_device_state == ShutterChildLock.ON.value
            else ShutterChildLock.OFF
        )

    def get_light_state(self, index: int) -> DeviceState:
        """Return the current light state."""
        start_index = 152 + (index * 32)
        end_index = start_index + 2
        hex_pos = self._hex_response[start_index:end_index].decode()
        hex_device_state = hex_pos[0:2]
        return (
            DeviceState.ON
            if hex_device_state == DeviceState.ON.value
            else DeviceState.OFF
        )

get_auto_shutdown() ⚓︎

Return the value of the auto shutdown configuration.

Source code in src/aioswitcher/api/messages.py
def get_auto_shutdown(self) -> str:
    """Return the value of the auto shutdown configuration."""
    hex_auto_off = self._hex_response[194:202]
    auto_off_seconds = int(
        hex_auto_off[6:8]
        + hex_auto_off[4:6]
        + hex_auto_off[2:4]
        + hex_auto_off[0:2],
        16,
    )
    return seconds_to_iso_time(auto_off_seconds)

get_light_state(index) ⚓︎

Return the current light state.

Source code in src/aioswitcher/api/messages.py
def get_light_state(self, index: int) -> DeviceState:
    """Return the current light state."""
    start_index = 152 + (index * 32)
    end_index = start_index + 2
    hex_pos = self._hex_response[start_index:end_index].decode()
    hex_device_state = hex_pos[0:2]
    return (
        DeviceState.ON
        if hex_device_state == DeviceState.ON.value
        else DeviceState.OFF
    )

get_power_consumption() ⚓︎

Return the current power consumption of the device.

Source code in src/aioswitcher/api/messages.py
def get_power_consumption(self) -> int:
    """Return the current power consumption of the device."""
    hex_power = self._hex_response[154:162]
    return int(hex_power[2:4] + hex_power[0:2], 16)

get_shutter_child_lock(index) ⚓︎

Return the current shutter child lock.

Source code in src/aioswitcher/api/messages.py
def get_shutter_child_lock(self, index: int) -> ShutterChildLock:
    """Return the current shutter child lock."""
    start_index = 154 + (index * 32)
    end_index = start_index + 2
    hex_pos = self._hex_response[start_index:end_index].decode()
    hex_device_state = hex_pos[0:2]
    return (
        ShutterChildLock.ON
        if hex_device_state == ShutterChildLock.ON.value
        else ShutterChildLock.OFF
    )

get_shutter_direction(index) ⚓︎

Return the current shutter direction.

Source code in src/aioswitcher/api/messages.py
def get_shutter_direction(self, index: int) -> ShutterDirection:
    """Return the current shutter direction."""
    start_index = 156 + (index * 32)
    end_index = start_index + 4
    hex_dir = self._hex_response[start_index:end_index].decode()
    directions = dict(map(lambda s: (s.value, s), ShutterDirection))
    return directions[hex_dir]

get_shutter_position(index) ⚓︎

Return the current shutter position.

Source code in src/aioswitcher/api/messages.py
def get_shutter_position(self, index: int) -> int:
    """Return the current shutter position."""
    start_index = 152 + (index * 32)
    end_index = start_index + 2
    hex_pos = self._hex_response[start_index:end_index].decode()
    return int(hex_pos, 16)

get_state() ⚓︎

Return the current device state.

Source code in src/aioswitcher/api/messages.py
def get_state(self) -> DeviceState:
    """Return the current device state."""
    hex_state = self._hex_response[150:152].decode()
    states = dict(map(lambda s: (s.value, s), DeviceState))
    return states[hex_state]

get_thermostat_fan_level() ⚓︎

Return the current thermostat fan level.

Source code in src/aioswitcher/api/messages.py
def get_thermostat_fan_level(self) -> ThermostatFanLevel:
    """Return the current thermostat fan level."""
    hex_level = self._hex_response[162:163].decode()
    levels = dict(map(lambda s: (s.value, s), ThermostatFanLevel))
    try:
        return levels[hex_level]
    except KeyError:
        return ThermostatFanLevel.LOW

get_thermostat_mode() ⚓︎

Return the current thermostat mode.

Source code in src/aioswitcher/api/messages.py
def get_thermostat_mode(self) -> ThermostatMode:
    """Return the current thermostat mode."""
    hex_mode = self._hex_response[158:160]
    modes = dict(map(lambda s: (s.value, s), ThermostatMode))
    try:
        return modes[hex_mode.decode()]
    except KeyError:
        return ThermostatMode.COOL

get_thermostat_remote_id() ⚓︎

Return the current thermostat remote.

Source code in src/aioswitcher/api/messages.py
def get_thermostat_remote_id(self) -> str:
    """Return the current thermostat remote."""
    remote_hex = unhexlify(self._hex_response)
    return remote_hex[84:92].decode().rstrip("\x00")

get_thermostat_state() ⚓︎

Return the current thermostat state.

Source code in src/aioswitcher/api/messages.py
def get_thermostat_state(self) -> DeviceState:
    """Return the current thermostat state."""
    hex_power = self._hex_response[156:158].decode()
    return DeviceState.OFF if hex_power == DeviceState.OFF.value else DeviceState.ON

get_thermostat_swing() ⚓︎

Return the current thermostat fan swing.

Source code in src/aioswitcher/api/messages.py
def get_thermostat_swing(self) -> ThermostatSwing:
    """Return the current thermostat fan swing."""
    hex_swing = self._hex_response[163:164].decode()
    return (
        ThermostatSwing.OFF
        if hex_swing == ThermostatSwing.OFF.value
        else ThermostatSwing.ON
    )

get_thermostat_target_temp() ⚓︎

Return the current temperature of the thermostat.

Source code in src/aioswitcher/api/messages.py
def get_thermostat_target_temp(self) -> int:
    """Return the current temperature of the thermostat."""
    hex_temp = self._hex_response[160:162]
    return int(hex_temp, 16)

get_thermostat_temp() ⚓︎

Return the current temp of the thermostat.

Source code in src/aioswitcher/api/messages.py
def get_thermostat_temp(self) -> float:
    """Return the current temp of the thermostat."""
    return int(self._hex_response[154:156] + self._hex_response[152:154], 16) / 10

get_time_left() ⚓︎

Return the time left for the device current run.

Source code in src/aioswitcher/api/messages.py
def get_time_left(self) -> str:
    """Return the time left for the device current run."""
    hex_time_left = self._hex_response[178:186]
    time_left_seconds = int(
        hex_time_left[6:8]
        + hex_time_left[4:6]
        + hex_time_left[2:4]
        + hex_time_left[0:2],
        16,
    )
    return seconds_to_iso_time(time_left_seconds)

get_time_on() ⚓︎

Return how long the device has been on.

Source code in src/aioswitcher/api/messages.py
def get_time_on(self) -> str:
    """Return how long the device has been on."""
    hex_time_on = self._hex_response[186:194]
    time_on_seconds = int(
        hex_time_on[6:8] + hex_time_on[4:6] + hex_time_on[2:4] + hex_time_on[0:2],
        16,
    )
    return seconds_to_iso_time(time_on_seconds)

SwitcherBaseResponse dataclass ⚓︎

Representation of the switcher base response message.

Applicable for all messages that do no require post initialization. e.g. not applicable for SwitcherLoginResponse, SwitcherStateResponse, SwitcherGetScheduleResponse.

Parameters:

Name Type Description Default
unparsed_response bytes

the raw response from the device.

required
Source code in src/aioswitcher/api/messages.py
@dataclass
class SwitcherBaseResponse:
    """Representation of the switcher base response message.

    Applicable for all messages that do no require post initialization.
    e.g. not applicable for SwitcherLoginResponse, SwitcherStateResponse,
    SwitcherGetScheduleResponse.

    Args:
        unparsed_response: the raw response from the device.

    """

    unparsed_response: bytes

    @property
    def successful(self) -> bool:
        """Return true if the response is not empty.

        Partially indicating the request was successful.
        """
        return self.unparsed_response is not None and len(self.unparsed_response) > 0

successful: bool property ⚓︎

Return true if the response is not empty.

Partially indicating the request was successful.

SwitcherGetSchedulesResponse dataclass ⚓︎

Bases: SwitcherBaseResponse

Representation of the switcher get schedule message.

Source code in src/aioswitcher/api/messages.py
@final
@dataclass
class SwitcherGetSchedulesResponse(SwitcherBaseResponse):
    """Representation of the switcher get schedule message."""

    schedules: Set[SwitcherSchedule] = field(init=False)

    def __post_init__(self) -> None:
        """Post initialization of the message."""
        self.schedules = get_schedules(self.unparsed_response)

    @property
    def found_schedules(self) -> bool:
        """Return true if found schedules in the response."""
        return len(self.schedules) > 0

found_schedules: bool property ⚓︎

Return true if found schedules in the response.

SwitcherLightStateResponse dataclass ⚓︎

Bases: SwitcherBaseResponse

Representation of the Switcher light devices state response message.

Source code in src/aioswitcher/api/messages.py
@final
@dataclass
class SwitcherLightStateResponse(SwitcherBaseResponse):
    """Representation of the Switcher light devices state response message."""

    state: DeviceState = field(init=False)
    device_type: DeviceType
    index: int

    def __post_init__(self) -> None:
        """Post initialization of the message."""
        parser = StateMessageParser(self.unparsed_response)
        index = get_light_discovery_packet_index(self.device_type, self.index)

        self.state = parser.get_light_state(index)

SwitcherLoginResponse dataclass ⚓︎

Bases: SwitcherBaseResponse

Representations of the switcher login response message.

Source code in src/aioswitcher/api/messages.py
@final
@dataclass
class SwitcherLoginResponse(SwitcherBaseResponse):
    """Representations of the switcher login response message."""

    session_id: str = field(init=False)

    def __post_init__(self) -> None:
        """Post initialization of the response."""
        try:
            self.session_id = hexlify(self.unparsed_response)[16:24].decode()
        except Exception as exc:
            raise ValueError("failed to parse login response message") from exc

SwitcherShutterStateResponse dataclass ⚓︎

Bases: SwitcherBaseResponse

Representation of the Switcher shutter devices state response message.

Source code in src/aioswitcher/api/messages.py
@final
@dataclass
class SwitcherShutterStateResponse(SwitcherBaseResponse):
    """Representation of the Switcher shutter devices state response message."""

    position: int = field(init=False)
    direction: ShutterDirection = field(init=False)
    child_lock: ShutterChildLock = field(init=False)
    device_type: DeviceType
    index: int

    def __post_init__(self) -> None:
        """Post initialization of the message."""
        parser = StateMessageParser(self.unparsed_response)
        index = get_shutter_discovery_packet_index(self.device_type, self.index)

        self.direction = parser.get_shutter_direction(index)
        self.position = parser.get_shutter_position(index)
        self.child_lock = parser.get_shutter_child_lock(index)

SwitcherStateResponse dataclass ⚓︎

Bases: SwitcherBaseResponse

Representation of the switcher state response message.

Source code in src/aioswitcher/api/messages.py
@final
@dataclass
class SwitcherStateResponse(SwitcherBaseResponse):
    """Representation of the switcher state response message."""

    state: DeviceState = field(init=False)
    time_left: str = field(init=False)
    time_on: str = field(init=False)
    auto_shutdown: str = field(init=False)
    power_consumption: int = field(init=False)
    electric_current: float = field(init=False)

    def __post_init__(self) -> None:
        """Post initialization of the message."""
        parser = StateMessageParser(self.unparsed_response)

        self.state = parser.get_state()
        self.time_left = parser.get_time_left()
        self.time_on = parser.get_time_on()
        self.auto_shutdown = parser.get_auto_shutdown()
        self.power_consumption = parser.get_power_consumption()
        self.electric_current = watts_to_amps(self.power_consumption)

SwitcherThermostatStateResponse dataclass ⚓︎

Bases: SwitcherBaseResponse

Representation of the Switcher thermostat device state response message.

Source code in src/aioswitcher/api/messages.py
@final
@dataclass
class SwitcherThermostatStateResponse(SwitcherBaseResponse):
    """Representation of the Switcher thermostat device state response message."""

    state: DeviceState = field(init=False)
    mode: ThermostatMode = field(init=False)
    fan_level: ThermostatFanLevel = field(init=False)
    temperature: float = field(init=False)
    target_temperature: int = field(init=False)
    swing: ThermostatSwing = field(init=False)
    remote_id: str = field(init=False)

    def __post_init__(self) -> None:
        """Post initialization of the message."""
        parser = StateMessageParser(self.unparsed_response)

        self.state = parser.get_thermostat_state()
        self.mode = parser.get_thermostat_mode()
        self.fan_level = parser.get_thermostat_fan_level()
        self.temperature = parser.get_thermostat_temp()
        self.target_temperature = parser.get_thermostat_target_temp()
        self.swing = parser.get_thermostat_swing()
        self.remote_id = parser.get_thermostat_remote_id()

Switcher integration API remote related classes and functions.

SwitcherBreezeCommand ⚓︎

Representations of the Switcher Breeze command message.

Parameters:

Name Type Description Default
command str

a string command ready to be parsed and sent

required
Source code in src/aioswitcher/api/remotes.py
@final
class SwitcherBreezeCommand:
    """Representations of the Switcher Breeze command message.

    Args:
        command: a string command ready to be parsed and sent

    """

    def __init__(self, command: str) -> None:
        """Initialize the Breeze command."""
        self.command = command
        self.length = self._get_command_length()

    def _get_command_length(self) -> str:
        """Get command length.

        Note:
            This is a private function used by other functions, do not call this
            function directly.

        """
        return "{:x}".format(int(len(self.command) / 2)).ljust(4, "0")

SwitcherBreezeRemote ⚓︎

Class that represent a remote for a Breeze device/s.

Parameters:

Name Type Description Default
ir_set Dict[str, Any]

a dictionary for all supported remotes

required

Methods:

Name Description
build_command

Build command that controls the Breeze device.

build_swing_command

Build a special command to control swing on special remotes.

Source code in src/aioswitcher/api/remotes.py
@final
class SwitcherBreezeRemote:
    """Class that represent a remote for a Breeze device/s.

    Args:
        ir_set: a dictionary for all supported remotes

    """

    def __init__(self, ir_set: Dict[str, Any]) -> None:
        """Initialize the remote by parsing the ir_set data."""
        self._min_temp = 100  # ridiculously high number
        self._max_temp = -100  # ridiculously low number
        self._on_off_type = False
        self._remote_id: str = ir_set["IRSetID"]
        # _ir_wave_map hosts a shrunk version of the ir_set file which ignores
        # unused data and map key to dict{"HexCode": str, "Para": str}
        # this is being built by the _resolve_capabilities method
        self._ir_wave_map: Dict[str, Dict[str, str]] = {}
        self._modes_features: Dict[ThermostatMode, Dict[str, Any]] = {}
        """
        self._modes_features basically explains the available features
            (Swing/Fan levels/ temp control of each mode)
        Example of _modes_features for ELEC7022 IRSet remote
        {
            < ThermostatMode.AUTO: ('01', 'auto') >: {
                'swing': False,
                'fan_levels': set(),
                'temperature_control': False
            }, < ThermostatMode.DRY: ('02', 'dry') >: {
                'swing': False,
                'fan_levels': set(),
                'temperature_control': False
            }, < ThermostatMode.FAN: ('03', 'fan') >: {
                'swing': False,
                'fan_levels': {
                    < ThermostatFanLevel.HIGH: ('3', 'high') > ,
                    < ThermostatFanLevel.AUTO: ('0', 'auto') > ,
                    < ThermostatFanLevel.MEDIUM: ('2', 'medium') > ,
                    < ThermostatFanLevel.LOW: ('1', 'low') >
                },
                'temperature_control': False
            }, < ThermostatMode.COOL: ('04', 'cool') >: {
                'swing': False,
                'fan_levels': {
                    < ThermostatFanLevel.HIGH: ('3', 'high') > ,
                    < ThermostatFanLevel.AUTO: ('0', 'auto') > ,
                    < ThermostatFanLevel.MEDIUM: ('2', 'medium') > ,
                    < ThermostatFanLevel.LOW: ('1', 'low') >
                },
                'temperature_control': True
            }, < ThermostatMode.HEAT: ('05', 'heat') >: {
                'swing': True,
                'fan_levels': {
                    < ThermostatFanLevel.HIGH: ('3', 'high') > ,
                    < ThermostatFanLevel.AUTO: ('0', 'auto') > ,
                    < ThermostatFanLevel.MEDIUM: ('2', 'medium') > ,
                    < ThermostatFanLevel.LOW: ('1', 'low') >
                },
                'temperature_control': True
            }
        }
        """
        self._separated_swing_command = (
            self._remote_id in SPECIAL_SWING_COMMAND_REMOTE_IDS
        )

        self._resolve_capabilities(ir_set)

    @property
    def modes_features(
        self,
    ) -> Dict[ThermostatMode, Dict[str, Any]]:
        """Getter for supported feature per mode."""
        return self._modes_features

    @property
    def supported_modes(self) -> List[ThermostatMode]:
        """Getter for supported modes."""
        return list(self.modes_features.keys())

    @property
    def max_temperature(self) -> int:
        """Getter for Maximum supported temperature."""
        return self._max_temp

    @property
    def min_temperature(self) -> int:
        """Getter for Minimum supported temperature."""
        return self._min_temp

    @property
    def remote_id(self) -> str:
        """Getter for remote id."""
        return self._remote_id

    @property
    def separated_swing_command(self) -> bool:
        """Getter for which indicates if the AC has a separated swing command."""
        return self._separated_swing_command

    @property
    def on_off_type(self) -> bool:
        """Getter for which indicates if the AC if on/off (toggle) type."""
        return self._on_off_type

    def _lookup_key_in_irset(self, key: List[str]) -> None:
        """Use this to look for a key in the IRSet file.

        Args:
            key: a reference to List of strings representing parts of the command.

        Note:
            This is a private function used by other functions, do not call this
            function directly.

        """
        while (
            len(key) != 1
        ):  # we match this condition with the key contains at least the mode
            # Try to lookup the key as is in the ir set map
            if "".join(key) not in self._ir_wave_map:
                # we didn't find a key, remove feature from the key and try to
                # look again.
                # The first feature removed is the swing "_d1"
                # Secondly is the fan level (_f0, _f1, _f2, _f3)
                # lastly we stay at least with the mode part
                removed_element = key.pop()
                logger.debug(f"Removed {removed_element} from the key")
            else:
                # found a match, with modified list
                return

    def build_swing_command(self, swing: ThermostatSwing) -> SwitcherBreezeCommand:
        """Build a special command to control swing on special remotes.

        Args:
            swing: the desired swing state

        Returns:
            An instance of ``SwitcherBreezeCommand``

        """
        key = "FUN_d0" if swing == ThermostatSwing.OFF else "FUN_d1"
        try:
            command = (
                self._ir_wave_map["".join(key)]["Para"]
                + "|"
                + self._ir_wave_map["".join(key)]["HexCode"]
            )
        except KeyError:
            logger.error(
                f'The special swing key "{key}"        \
                    does not exist in the IRSet database!'
            )
            raise RuntimeError(
                f'The special swing key "{key}"'
                " does not exist in the IRSet database!"
            )

        return SwitcherBreezeCommand(
            "00000000" + hexlify(str(command).encode()).decode()
        )

    def build_command(
        self,
        state: DeviceState,
        mode: ThermostatMode,
        target_temp: int,
        fan_level: ThermostatFanLevel,
        swing: ThermostatSwing,
        current_state: Union[DeviceState, None] = None,
    ) -> SwitcherBreezeCommand:
        """Build command that controls the Breeze device.

        Args:
            state: the desired state of the device
            mode: the desired mode of the device
            target_temp: the target temperature
            fan_level: the desired fan level
            swing: the desired swing state
            current_state: optionally, for toggle device, pass previous state to avoid
                redundant requests

        Returns:
            An instance of ``SwitcherBreezeCommand``

        """
        key: List[str] = []
        command = ""
        # verify the target temp and set maximum if we provided with higher number
        if target_temp > self._max_temp:
            target_temp = self._max_temp

        # verify the target temp and set minimum if we provided with lower number
        elif target_temp < self._min_temp:
            target_temp = self._min_temp

        if mode not in self.supported_modes:
            raise RuntimeError(
                f'Invalid mode "{mode.display}", available modes for this device are: '
                f"{', '.join([x.display for x in self.supported_modes])}"
            )

        # non toggle AC, just turn it off
        if not self._on_off_type and state == DeviceState.OFF:
            key.append("off")
        else:
            # This is a toggle mode AC, we determine here whether the first bit should
            # be on or off in order to change the AC state based on its current state.
            if self._on_off_type and current_state and current_state != state:
                # This is a toggle mode AC.
                key.append("on_")

            # for toggle mode AC - set state. for non toggle AC mode set state and turn
            # it on.
            if self._on_off_type or (not self._on_off_type and state == DeviceState.ON):
                # Auto and Dry can sometimes have a FAN level and in other cases
                # it might not have. in any case we try to add the request fan
                # level to the key, if we get a match we fulfill the request, otherwise
                # we remove the fan and lookup the key again
                if mode in [
                    ThermostatMode.AUTO,
                    ThermostatMode.DRY,
                    ThermostatMode.FAN,
                ]:
                    # the command key should start with mode (aa/ad/ar/ah)
                    key.append(MODE_TO_COMMAND[mode])
                    # add the requested fan level (_f0, _f1, _f2, _f3)
                    key.append("_" + FAN_LEVEL_TO_COMMAND[fan_level])

                    # add the swing On (_d1) to the key
                    if swing == ThermostatSwing.ON:
                        key.append("_d1")

                    self._lookup_key_in_irset(key)

                if mode in [ThermostatMode.COOL, ThermostatMode.HEAT]:
                    key.append(MODE_TO_COMMAND[mode])
                    key.append(str(target_temp))
                    key.append("_" + FAN_LEVEL_TO_COMMAND[fan_level])
                    if swing == ThermostatSwing.ON:
                        key.append("_d1")

                    self._lookup_key_in_irset(key)

        command = (
            self._ir_wave_map["".join(key)]["Para"]
            + "|"
            + self._ir_wave_map["".join(key)]["HexCode"]
        )
        return SwitcherBreezeCommand(
            "00000000" + hexlify(str(command).encode()).decode()
        )

    def _resolve_capabilities(self, ir_set: Dict[str, Any]) -> None:
        """Parse the ir_set of the remote and build capability data struct.

        Args:
            ir_set: a dictionary for all supported remotes

        Note:
            This is a private function used by other functions, do not call this
            function directly.

        """
        if ir_set["OnOffType"] == 1:
            self._on_off_type = True

        mode = None

        for wave in ir_set["IRWaveList"]:
            key = wave["Key"]
            try:
                mode = COMMAND_TO_MODE[key[0:2]]
                if mode not in self._modes_features:
                    self._modes_features[mode] = {
                        "swing": False,
                        "fan_levels": set(),
                        "temperature_control": False,
                    }

                    # This type of ACs support swing mode in every mode
                    if self.separated_swing_command:
                        self._modes_features[mode]["swing"] = True

            except KeyError:
                pass

            fan_level = re.match(r".+(f\d)", key)
            if fan_level and mode:
                self._modes_features[mode]["fan_levels"].add(
                    COMMAND_TO_FAN_LEVEL[fan_level.group(1)]
                )

            temp = key[2:4]
            if temp.isdigit():
                if mode and not self._modes_features[mode]["temperature_control"]:
                    self._modes_features[mode]["temperature_control"] = True
                temp = int(temp)
                if temp > self._max_temp:
                    self._max_temp = temp
                if temp < self._min_temp:
                    self._min_temp = temp

            if mode:
                self._modes_features[mode]["swing"] |= "d1" in key

            self._ir_wave_map[key] = {"Para": wave["Para"], "HexCode": wave["HexCode"]}

max_temperature: int property ⚓︎

Getter for Maximum supported temperature.

min_temperature: int property ⚓︎

Getter for Minimum supported temperature.

modes_features: Dict[ThermostatMode, Dict[str, Any]] property ⚓︎

Getter for supported feature per mode.

on_off_type: bool property ⚓︎

Getter for which indicates if the AC if on/off (toggle) type.

remote_id: str property ⚓︎

Getter for remote id.

separated_swing_command: bool property ⚓︎

Getter for which indicates if the AC has a separated swing command.

supported_modes: List[ThermostatMode] property ⚓︎

Getter for supported modes.

build_command(state, mode, target_temp, fan_level, swing, current_state=None) ⚓︎

Build command that controls the Breeze device.

Parameters:

Name Type Description Default
state DeviceState

the desired state of the device

required
mode ThermostatMode

the desired mode of the device

required
target_temp int

the target temperature

required
fan_level ThermostatFanLevel

the desired fan level

required
swing ThermostatSwing

the desired swing state

required
current_state Union[DeviceState, None]

optionally, for toggle device, pass previous state to avoid redundant requests

None

Returns:

Type Description
SwitcherBreezeCommand

An instance of SwitcherBreezeCommand

Source code in src/aioswitcher/api/remotes.py
def build_command(
    self,
    state: DeviceState,
    mode: ThermostatMode,
    target_temp: int,
    fan_level: ThermostatFanLevel,
    swing: ThermostatSwing,
    current_state: Union[DeviceState, None] = None,
) -> SwitcherBreezeCommand:
    """Build command that controls the Breeze device.

    Args:
        state: the desired state of the device
        mode: the desired mode of the device
        target_temp: the target temperature
        fan_level: the desired fan level
        swing: the desired swing state
        current_state: optionally, for toggle device, pass previous state to avoid
            redundant requests

    Returns:
        An instance of ``SwitcherBreezeCommand``

    """
    key: List[str] = []
    command = ""
    # verify the target temp and set maximum if we provided with higher number
    if target_temp > self._max_temp:
        target_temp = self._max_temp

    # verify the target temp and set minimum if we provided with lower number
    elif target_temp < self._min_temp:
        target_temp = self._min_temp

    if mode not in self.supported_modes:
        raise RuntimeError(
            f'Invalid mode "{mode.display}", available modes for this device are: '
            f"{', '.join([x.display for x in self.supported_modes])}"
        )

    # non toggle AC, just turn it off
    if not self._on_off_type and state == DeviceState.OFF:
        key.append("off")
    else:
        # This is a toggle mode AC, we determine here whether the first bit should
        # be on or off in order to change the AC state based on its current state.
        if self._on_off_type and current_state and current_state != state:
            # This is a toggle mode AC.
            key.append("on_")

        # for toggle mode AC - set state. for non toggle AC mode set state and turn
        # it on.
        if self._on_off_type or (not self._on_off_type and state == DeviceState.ON):
            # Auto and Dry can sometimes have a FAN level and in other cases
            # it might not have. in any case we try to add the request fan
            # level to the key, if we get a match we fulfill the request, otherwise
            # we remove the fan and lookup the key again
            if mode in [
                ThermostatMode.AUTO,
                ThermostatMode.DRY,
                ThermostatMode.FAN,
            ]:
                # the command key should start with mode (aa/ad/ar/ah)
                key.append(MODE_TO_COMMAND[mode])
                # add the requested fan level (_f0, _f1, _f2, _f3)
                key.append("_" + FAN_LEVEL_TO_COMMAND[fan_level])

                # add the swing On (_d1) to the key
                if swing == ThermostatSwing.ON:
                    key.append("_d1")

                self._lookup_key_in_irset(key)

            if mode in [ThermostatMode.COOL, ThermostatMode.HEAT]:
                key.append(MODE_TO_COMMAND[mode])
                key.append(str(target_temp))
                key.append("_" + FAN_LEVEL_TO_COMMAND[fan_level])
                if swing == ThermostatSwing.ON:
                    key.append("_d1")

                self._lookup_key_in_irset(key)

    command = (
        self._ir_wave_map["".join(key)]["Para"]
        + "|"
        + self._ir_wave_map["".join(key)]["HexCode"]
    )
    return SwitcherBreezeCommand(
        "00000000" + hexlify(str(command).encode()).decode()
    )

build_swing_command(swing) ⚓︎

Build a special command to control swing on special remotes.

Parameters:

Name Type Description Default
swing ThermostatSwing

the desired swing state

required

Returns:

Type Description
SwitcherBreezeCommand

An instance of SwitcherBreezeCommand

Source code in src/aioswitcher/api/remotes.py
def build_swing_command(self, swing: ThermostatSwing) -> SwitcherBreezeCommand:
    """Build a special command to control swing on special remotes.

    Args:
        swing: the desired swing state

    Returns:
        An instance of ``SwitcherBreezeCommand``

    """
    key = "FUN_d0" if swing == ThermostatSwing.OFF else "FUN_d1"
    try:
        command = (
            self._ir_wave_map["".join(key)]["Para"]
            + "|"
            + self._ir_wave_map["".join(key)]["HexCode"]
        )
    except KeyError:
        logger.error(
            f'The special swing key "{key}"        \
                does not exist in the IRSet database!'
        )
        raise RuntimeError(
            f'The special swing key "{key}"'
            " does not exist in the IRSet database!"
        )

    return SwitcherBreezeCommand(
        "00000000" + hexlify(str(command).encode()).decode()
    )

SwitcherBreezeRemoteManager ⚓︎

Class for managing Breeze remotes.

Parameters:

Name Type Description Default
remotes_db_path str

optional path of supported remote json file

BREEZE_REMOTE_DB_FPATH

Methods:

Name Description
get_remote

Get Breeze remote by the remote id.

Source code in src/aioswitcher/api/remotes.py
class SwitcherBreezeRemoteManager:
    """Class for managing Breeze remotes.

    Args:
        remotes_db_path: optional path of supported remote json file

    """

    def __init__(self, remotes_db_path: str = BREEZE_REMOTE_DB_FPATH) -> None:
        """Initialize the Remote manager."""
        self._remotes_db: Dict[str, SwitcherBreezeRemote] = {}
        self._remotes_db_fpath = remotes_db_path

    def get_remote(self, remote_id: str) -> SwitcherBreezeRemote:
        """Get Breeze remote by the remote id.

        Args:
            remote_id: the id of the desired remote

        Returns:
            an instance of ``SwitcherBreezeRemote``

        """
        # check if the remote was already loaded
        if remote_id not in self._remotes_db:
            # load the remote into the memory
            with open(self._remotes_db_fpath) as remotes_fd:
                self._remotes_db[remote_id] = SwitcherBreezeRemote(
                    load(remotes_fd)[remote_id]
                )

        return self._remotes_db[remote_id]

get_remote(remote_id) ⚓︎

Get Breeze remote by the remote id.

Parameters:

Name Type Description Default
remote_id str

the id of the desired remote

required

Returns:

Type Description
SwitcherBreezeRemote

an instance of SwitcherBreezeRemote

Source code in src/aioswitcher/api/remotes.py
def get_remote(self, remote_id: str) -> SwitcherBreezeRemote:
    """Get Breeze remote by the remote id.

    Args:
        remote_id: the id of the desired remote

    Returns:
        an instance of ``SwitcherBreezeRemote``

    """
    # check if the remote was already loaded
    if remote_id not in self._remotes_db:
        # load the remote into the memory
        with open(self._remotes_db_fpath) as remotes_fd:
            self._remotes_db[remote_id] = SwitcherBreezeRemote(
                load(remotes_fd)[remote_id]
            )

    return self._remotes_db[remote_id]

Switcher integration, UDP Bridge module.

DatagramParser dataclass ⚓︎

Utility class for parsing a datagram into various device properties.

Methods:

Name Description
get_auto_shutdown

Extract the auto shutdown value from the broadcast message.

get_device_id

Extract the device id from the broadcast message.

get_device_key

Extract the device id from the broadcast message.

get_device_state

Extract the device state from the broadcast message.

get_device_type

Extract the device type from the broadcast message.

get_ip_type1

Extract the IP address from the type1 broadcast message (Heater, Plug).

get_ip_type2

Extract the IP address from the broadcast message (Breeze, Runners).

get_light_state

Extract the light state from the broadcast message.

get_mac_type1

Extract the MAC address from the broadcast message (Heater, Plug).

get_mac_type2

Extract the MAC address from the broadcast message (Breeze, Runners).

get_name

Extract the device name from the broadcast message.

get_power_consumption

Extract the power consumption from the broadcast message.

get_remaining

Extract the time remains for the current execution.

get_shutter_child_lock

Extract the shutter child lock state from the broadcast message.

get_shutter_direction

Return the current direction of the shutter (UP/DOWN/STOP).

get_shutter_position

Return the current position of the shutter 0 <= pos <= 100.

get_thermostat_fan_level

Return the current thermostat fan level.

get_thermostat_mode

Return the current thermostat mode.

get_thermostat_remote_id

Return the current thermostat remote.

get_thermostat_state

Return the current thermostat state.

get_thermostat_swing

Return the current thermostat fan swing.

get_thermostat_target_temp

Return the current temp of the thermostat.

get_thermostat_temp

Return the current temp of the thermostat.

is_switcher_originator

Verify the broadcast message had originated from a switcher device.

Source code in src/aioswitcher/bridge.py
@final
@dataclass(frozen=True)
class DatagramParser:
    """Utility class for parsing a datagram into various device properties."""

    message: bytes

    def is_switcher_originator(self) -> bool:
        """Verify the broadcast message had originated from a switcher device."""
        return hexlify(self.message)[0:4].decode() == "fef0" and (
            len(self.message) == 165
            or len(self.message) == 168  # Switcher Breeze
            or len(self.message) == 159  # Switcher Runner and RunnerMini
            or len(self.message) == 203  # Switcher Runner S11 and Switcher Runner S12
            or len(self.message)
            == 207  # Switcher Light SL01, Switcher Light SL01 Mini,
            # Switcher Light SL02, Switcher Light SL02 Mini and Switcher Light SL03
        )

    def get_ip_type1(self) -> str:
        """Extract the IP address from the type1 broadcast message (Heater, Plug)."""
        hex_ip = hexlify(self.message)[152:160]
        ip_addr = int(hex_ip[6:8] + hex_ip[4:6] + hex_ip[2:4] + hex_ip[0:2], 16)
        return inet_ntoa(pack("<L", ip_addr))

    def get_ip_type2(self) -> str:
        """Extract the IP address from the broadcast message (Breeze, Runners)."""
        hex_ip = hexlify(self.message)[154:162]
        ip_addr = int(hex_ip[0:2] + hex_ip[2:4] + hex_ip[4:6] + hex_ip[6:8], 16)
        return inet_ntoa(pack(">L", ip_addr))

    def get_mac_type1(self) -> str:
        """Extract the MAC address from the broadcast message (Heater, Plug)."""
        hex_mac = hexlify(self.message)[160:172].decode().upper()
        return (
            hex_mac[0:2]
            + ":"
            + hex_mac[2:4]
            + ":"
            + hex_mac[4:6]
            + ":"
            + hex_mac[6:8]
            + ":"
            + hex_mac[8:10]
            + ":"
            + hex_mac[10:12]
        )

    def get_mac_type2(self) -> str:
        """Extract the MAC address from the broadcast message (Breeze, Runners)."""
        hex_mac = hexlify(self.message)[162:174].decode().upper()
        return (
            hex_mac[0:2]
            + ":"
            + hex_mac[2:4]
            + ":"
            + hex_mac[4:6]
            + ":"
            + hex_mac[6:8]
            + ":"
            + hex_mac[8:10]
            + ":"
            + hex_mac[10:12]
        )

    def get_name(self) -> str:
        """Extract the device name from the broadcast message."""
        return self.message[42:74].decode().rstrip("\x00")

    def get_device_id(self) -> str:
        """Extract the device id from the broadcast message."""
        return hexlify(self.message)[36:42].decode()

    def get_device_key(self) -> str:
        """Extract the device id from the broadcast message."""
        return hexlify(self.message)[80:82].decode()

    def get_device_state(self) -> DeviceState:
        """Extract the device state from the broadcast message."""
        hex_device_state = hexlify(self.message)[266:268].decode()
        return (
            DeviceState.ON
            if hex_device_state == DeviceState.ON.value
            else DeviceState.OFF
        )

    def get_auto_shutdown(self) -> str:
        """Extract the auto shutdown value from the broadcast message."""
        hex_auto_shutdown_val = hexlify(self.message)[310:318]
        int_auto_shutdown_val_secs = int(
            hex_auto_shutdown_val[6:8]
            + hex_auto_shutdown_val[4:6]
            + hex_auto_shutdown_val[2:4]
            + hex_auto_shutdown_val[0:2],
            16,
        )
        return seconds_to_iso_time(int_auto_shutdown_val_secs)

    def get_power_consumption(self) -> int:
        """Extract the power consumption from the broadcast message."""
        hex_power_consumption = hexlify(self.message)[270:278]
        return int(hex_power_consumption[2:4] + hex_power_consumption[0:2], 16)

    def get_remaining(self) -> str:
        """Extract the time remains for the current execution."""
        hex_remaining_time = hexlify(self.message)[294:302]
        int_remaining_time_seconds = int(
            hex_remaining_time[6:8]
            + hex_remaining_time[4:6]
            + hex_remaining_time[2:4]
            + hex_remaining_time[0:2],
            16,
        )
        return seconds_to_iso_time(int_remaining_time_seconds)

    def get_device_type(self) -> DeviceType:
        """Extract the device type from the broadcast message."""
        hex_model = hexlify(self.message[74:76]).decode()
        devices = dict(map(lambda d: (d.hex_rep, d), DeviceType))
        return devices[hex_model]

    # Switcher Runners methods

    def get_shutter_position(self, index: int) -> int:
        """Return the current position of the shutter 0 <= pos <= 100."""
        start_index = 135 + (index * 16)
        end_index = start_index + 2
        hex_pos = hexlify(self.message[start_index:end_index]).decode()
        return int(hex_pos[2:4]) + int(hex_pos[0:2], 16)

    def get_shutter_direction(self, index: int) -> ShutterDirection:
        """Return the current direction of the shutter (UP/DOWN/STOP)."""
        start_index = 137 + (index * 16)
        end_index = start_index + 2
        hex_direction = hexlify(self.message[start_index:end_index]).decode()
        directions = dict(map(lambda d: (d.value, d), ShutterDirection))
        return directions[hex_direction]

    def get_shutter_child_lock(self, index: int) -> ShutterChildLock:
        """Extract the shutter child lock state from the broadcast message."""
        start_index = 136 + (index * 16)
        end_index = start_index + 2
        hex_pos = hexlify(self.message[start_index:end_index]).decode()
        hex_device_state = hex_pos[0:2]
        return (
            ShutterChildLock.ON
            if hex_device_state == ShutterChildLock.ON.value
            else ShutterChildLock.OFF
        )

    def get_light_state(self, index: int) -> DeviceState:
        """Extract the light state from the broadcast message."""
        start_index = 135 + (index * 16)
        end_index = start_index + 2
        hex_pos = hexlify(self.message[start_index:end_index]).decode()
        hex_device_state = hex_pos[0:2]
        return (
            DeviceState.ON
            if hex_device_state == DeviceState.ON.value
            else DeviceState.OFF
        )

    # Switcher Breeze methods

    def get_thermostat_temp(self) -> float:
        """Return the current temp of the thermostat."""
        hex_temp = hexlify(self.message[135:137]).decode()
        return int(hex_temp[2:4] + hex_temp[0:2], 16) / 10

    def get_thermostat_state(self) -> DeviceState:
        """Return the current thermostat state."""
        hex_power = hexlify(self.message[137:138]).decode()
        return DeviceState.ON if hex_power == DeviceState.ON.value else DeviceState.OFF

    def get_thermostat_mode(self) -> ThermostatMode:
        """Return the current thermostat mode."""
        hex_mode = hexlify(self.message[138:139]).decode()
        states = dict(map(lambda s: (s.value, s), ThermostatMode))
        return ThermostatMode.COOL if hex_mode not in states else states[hex_mode]

    def get_thermostat_target_temp(self) -> int:
        """Return the current temp of the thermostat."""
        hex_temp = hexlify(self.message[139:140]).decode()
        return int(hex_temp, 16)

    def get_thermostat_fan_level(self) -> ThermostatFanLevel:
        """Return the current thermostat fan level."""
        hex_level = hexlify(self.message[140:141]).decode()
        states = dict(map(lambda s: (s.value, s), ThermostatFanLevel))
        return states[hex_level[0:1]]

    def get_thermostat_swing(self) -> ThermostatSwing:
        """Return the current thermostat fan swing."""
        hex_swing = hexlify(self.message[140:141]).decode()

        return (
            ThermostatSwing.OFF
            if hex_swing[1:2] == ThermostatSwing.OFF.value
            else ThermostatSwing.ON
        )

    def get_thermostat_remote_id(self) -> str:
        """Return the current thermostat remote."""
        return self.message[143:151].decode()

get_auto_shutdown() ⚓︎

Extract the auto shutdown value from the broadcast message.

Source code in src/aioswitcher/bridge.py
def get_auto_shutdown(self) -> str:
    """Extract the auto shutdown value from the broadcast message."""
    hex_auto_shutdown_val = hexlify(self.message)[310:318]
    int_auto_shutdown_val_secs = int(
        hex_auto_shutdown_val[6:8]
        + hex_auto_shutdown_val[4:6]
        + hex_auto_shutdown_val[2:4]
        + hex_auto_shutdown_val[0:2],
        16,
    )
    return seconds_to_iso_time(int_auto_shutdown_val_secs)

get_device_id() ⚓︎

Extract the device id from the broadcast message.

Source code in src/aioswitcher/bridge.py
def get_device_id(self) -> str:
    """Extract the device id from the broadcast message."""
    return hexlify(self.message)[36:42].decode()

get_device_key() ⚓︎

Extract the device id from the broadcast message.

Source code in src/aioswitcher/bridge.py
def get_device_key(self) -> str:
    """Extract the device id from the broadcast message."""
    return hexlify(self.message)[80:82].decode()

get_device_state() ⚓︎

Extract the device state from the broadcast message.

Source code in src/aioswitcher/bridge.py
def get_device_state(self) -> DeviceState:
    """Extract the device state from the broadcast message."""
    hex_device_state = hexlify(self.message)[266:268].decode()
    return (
        DeviceState.ON
        if hex_device_state == DeviceState.ON.value
        else DeviceState.OFF
    )

get_device_type() ⚓︎

Extract the device type from the broadcast message.

Source code in src/aioswitcher/bridge.py
def get_device_type(self) -> DeviceType:
    """Extract the device type from the broadcast message."""
    hex_model = hexlify(self.message[74:76]).decode()
    devices = dict(map(lambda d: (d.hex_rep, d), DeviceType))
    return devices[hex_model]

get_ip_type1() ⚓︎

Extract the IP address from the type1 broadcast message (Heater, Plug).

Source code in src/aioswitcher/bridge.py
def get_ip_type1(self) -> str:
    """Extract the IP address from the type1 broadcast message (Heater, Plug)."""
    hex_ip = hexlify(self.message)[152:160]
    ip_addr = int(hex_ip[6:8] + hex_ip[4:6] + hex_ip[2:4] + hex_ip[0:2], 16)
    return inet_ntoa(pack("<L", ip_addr))

get_ip_type2() ⚓︎

Extract the IP address from the broadcast message (Breeze, Runners).

Source code in src/aioswitcher/bridge.py
def get_ip_type2(self) -> str:
    """Extract the IP address from the broadcast message (Breeze, Runners)."""
    hex_ip = hexlify(self.message)[154:162]
    ip_addr = int(hex_ip[0:2] + hex_ip[2:4] + hex_ip[4:6] + hex_ip[6:8], 16)
    return inet_ntoa(pack(">L", ip_addr))

get_light_state(index) ⚓︎

Extract the light state from the broadcast message.

Source code in src/aioswitcher/bridge.py
def get_light_state(self, index: int) -> DeviceState:
    """Extract the light state from the broadcast message."""
    start_index = 135 + (index * 16)
    end_index = start_index + 2
    hex_pos = hexlify(self.message[start_index:end_index]).decode()
    hex_device_state = hex_pos[0:2]
    return (
        DeviceState.ON
        if hex_device_state == DeviceState.ON.value
        else DeviceState.OFF
    )

get_mac_type1() ⚓︎

Extract the MAC address from the broadcast message (Heater, Plug).

Source code in src/aioswitcher/bridge.py
def get_mac_type1(self) -> str:
    """Extract the MAC address from the broadcast message (Heater, Plug)."""
    hex_mac = hexlify(self.message)[160:172].decode().upper()
    return (
        hex_mac[0:2]
        + ":"
        + hex_mac[2:4]
        + ":"
        + hex_mac[4:6]
        + ":"
        + hex_mac[6:8]
        + ":"
        + hex_mac[8:10]
        + ":"
        + hex_mac[10:12]
    )

get_mac_type2() ⚓︎

Extract the MAC address from the broadcast message (Breeze, Runners).

Source code in src/aioswitcher/bridge.py
def get_mac_type2(self) -> str:
    """Extract the MAC address from the broadcast message (Breeze, Runners)."""
    hex_mac = hexlify(self.message)[162:174].decode().upper()
    return (
        hex_mac[0:2]
        + ":"
        + hex_mac[2:4]
        + ":"
        + hex_mac[4:6]
        + ":"
        + hex_mac[6:8]
        + ":"
        + hex_mac[8:10]
        + ":"
        + hex_mac[10:12]
    )

get_name() ⚓︎

Extract the device name from the broadcast message.

Source code in src/aioswitcher/bridge.py
def get_name(self) -> str:
    """Extract the device name from the broadcast message."""
    return self.message[42:74].decode().rstrip("\x00")

get_power_consumption() ⚓︎

Extract the power consumption from the broadcast message.

Source code in src/aioswitcher/bridge.py
def get_power_consumption(self) -> int:
    """Extract the power consumption from the broadcast message."""
    hex_power_consumption = hexlify(self.message)[270:278]
    return int(hex_power_consumption[2:4] + hex_power_consumption[0:2], 16)

get_remaining() ⚓︎

Extract the time remains for the current execution.

Source code in src/aioswitcher/bridge.py
def get_remaining(self) -> str:
    """Extract the time remains for the current execution."""
    hex_remaining_time = hexlify(self.message)[294:302]
    int_remaining_time_seconds = int(
        hex_remaining_time[6:8]
        + hex_remaining_time[4:6]
        + hex_remaining_time[2:4]
        + hex_remaining_time[0:2],
        16,
    )
    return seconds_to_iso_time(int_remaining_time_seconds)

get_shutter_child_lock(index) ⚓︎

Extract the shutter child lock state from the broadcast message.

Source code in src/aioswitcher/bridge.py
def get_shutter_child_lock(self, index: int) -> ShutterChildLock:
    """Extract the shutter child lock state from the broadcast message."""
    start_index = 136 + (index * 16)
    end_index = start_index + 2
    hex_pos = hexlify(self.message[start_index:end_index]).decode()
    hex_device_state = hex_pos[0:2]
    return (
        ShutterChildLock.ON
        if hex_device_state == ShutterChildLock.ON.value
        else ShutterChildLock.OFF
    )

get_shutter_direction(index) ⚓︎

Return the current direction of the shutter (UP/DOWN/STOP).

Source code in src/aioswitcher/bridge.py
def get_shutter_direction(self, index: int) -> ShutterDirection:
    """Return the current direction of the shutter (UP/DOWN/STOP)."""
    start_index = 137 + (index * 16)
    end_index = start_index + 2
    hex_direction = hexlify(self.message[start_index:end_index]).decode()
    directions = dict(map(lambda d: (d.value, d), ShutterDirection))
    return directions[hex_direction]

get_shutter_position(index) ⚓︎

Return the current position of the shutter 0 <= pos <= 100.

Source code in src/aioswitcher/bridge.py
def get_shutter_position(self, index: int) -> int:
    """Return the current position of the shutter 0 <= pos <= 100."""
    start_index = 135 + (index * 16)
    end_index = start_index + 2
    hex_pos = hexlify(self.message[start_index:end_index]).decode()
    return int(hex_pos[2:4]) + int(hex_pos[0:2], 16)

get_thermostat_fan_level() ⚓︎

Return the current thermostat fan level.

Source code in src/aioswitcher/bridge.py
def get_thermostat_fan_level(self) -> ThermostatFanLevel:
    """Return the current thermostat fan level."""
    hex_level = hexlify(self.message[140:141]).decode()
    states = dict(map(lambda s: (s.value, s), ThermostatFanLevel))
    return states[hex_level[0:1]]

get_thermostat_mode() ⚓︎

Return the current thermostat mode.

Source code in src/aioswitcher/bridge.py
def get_thermostat_mode(self) -> ThermostatMode:
    """Return the current thermostat mode."""
    hex_mode = hexlify(self.message[138:139]).decode()
    states = dict(map(lambda s: (s.value, s), ThermostatMode))
    return ThermostatMode.COOL if hex_mode not in states else states[hex_mode]

get_thermostat_remote_id() ⚓︎

Return the current thermostat remote.

Source code in src/aioswitcher/bridge.py
def get_thermostat_remote_id(self) -> str:
    """Return the current thermostat remote."""
    return self.message[143:151].decode()

get_thermostat_state() ⚓︎

Return the current thermostat state.

Source code in src/aioswitcher/bridge.py
def get_thermostat_state(self) -> DeviceState:
    """Return the current thermostat state."""
    hex_power = hexlify(self.message[137:138]).decode()
    return DeviceState.ON if hex_power == DeviceState.ON.value else DeviceState.OFF

get_thermostat_swing() ⚓︎

Return the current thermostat fan swing.

Source code in src/aioswitcher/bridge.py
def get_thermostat_swing(self) -> ThermostatSwing:
    """Return the current thermostat fan swing."""
    hex_swing = hexlify(self.message[140:141]).decode()

    return (
        ThermostatSwing.OFF
        if hex_swing[1:2] == ThermostatSwing.OFF.value
        else ThermostatSwing.ON
    )

get_thermostat_target_temp() ⚓︎

Return the current temp of the thermostat.

Source code in src/aioswitcher/bridge.py
def get_thermostat_target_temp(self) -> int:
    """Return the current temp of the thermostat."""
    hex_temp = hexlify(self.message[139:140]).decode()
    return int(hex_temp, 16)

get_thermostat_temp() ⚓︎

Return the current temp of the thermostat.

Source code in src/aioswitcher/bridge.py
def get_thermostat_temp(self) -> float:
    """Return the current temp of the thermostat."""
    hex_temp = hexlify(self.message[135:137]).decode()
    return int(hex_temp[2:4] + hex_temp[0:2], 16) / 10

is_switcher_originator() ⚓︎

Verify the broadcast message had originated from a switcher device.

Source code in src/aioswitcher/bridge.py
def is_switcher_originator(self) -> bool:
    """Verify the broadcast message had originated from a switcher device."""
    return hexlify(self.message)[0:4].decode() == "fef0" and (
        len(self.message) == 165
        or len(self.message) == 168  # Switcher Breeze
        or len(self.message) == 159  # Switcher Runner and RunnerMini
        or len(self.message) == 203  # Switcher Runner S11 and Switcher Runner S12
        or len(self.message)
        == 207  # Switcher Light SL01, Switcher Light SL01 Mini,
        # Switcher Light SL02, Switcher Light SL02 Mini and Switcher Light SL03
    )

SwitcherBridge ⚓︎

Use for running a UDP client for bridging Switcher devices broadcast messages.

Parameters:

Name Type Description Default
on_device Callable[[SwitcherBase], Any]

a callable to which every new SwitcherBase device found will be send.

required
broadcast_ports List[int]

broadcast ports list, default for type 1 devices is 20002, default for type 2 devices is 20003. On newer type1 devices, the port is 10002. On newer type2 devices, the port is 10003.

[SWITCHER_UDP_PORT_TYPE1, SWITCHER_UDP_PORT_TYPE1_NEW_VERSION, SWITCHER_UDP_PORT_TYPE2, SWITCHER_UDP_PORT_TYPE2_NEW_VERSION]

Methods:

Name Description
start

Create an asynchronous listener and start the bridge.

stop

Stop the asynchronous bridge.

Source code in src/aioswitcher/bridge.py
@final
class SwitcherBridge:
    """Use for running a UDP client for bridging Switcher devices broadcast messages.

    Args:
        on_device: a callable to which every new SwitcherBase device found will be send.
        broadcast_ports: broadcast ports list, default for type 1 devices is 20002,
            default for type 2 devices is 20003.
            On newer type1 devices, the port is 10002.
            On newer type2 devices, the port is 10003.

    """

    def __init__(
        self,
        on_device: Callable[[SwitcherBase], Any],
        broadcast_ports: List[int] = [
            SWITCHER_UDP_PORT_TYPE1,
            SWITCHER_UDP_PORT_TYPE1_NEW_VERSION,
            SWITCHER_UDP_PORT_TYPE2,
            SWITCHER_UDP_PORT_TYPE2_NEW_VERSION,
        ],
    ) -> None:
        """Initialize the switcher bridge."""
        self._on_device = on_device
        self._broadcast_ports = broadcast_ports
        self._is_running = False
        self._transports: Dict[int, Optional[BaseTransport]] = {}

    async def __aenter__(self) -> "SwitcherBridge":
        """Enter SwitcherBridge asynchronous context manager."""
        await self.start()
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        """Exit the SwitcherBridge asynchronous context manager."""
        await self.stop()

    async def start(self) -> None:
        """Create an asynchronous listener and start the bridge."""
        for broadcast_port in self._broadcast_ports:
            logger.info("starting the udp bridge on port %s", broadcast_port)
            protocol_factory = UdpClientProtocol(
                partial(_parse_device_from_datagram, self._on_device)
            )
            transport, protocol = await get_running_loop().create_datagram_endpoint(
                lambda: protocol_factory,
                local_addr=("0.0.0.0", broadcast_port),  # nosec
                family=AF_INET,
            )
            self._transports[broadcast_port] = transport
            logger.debug("udp bridge on port %s started", broadcast_port)

        self._is_running = True

    async def stop(self) -> None:
        """Stop the asynchronous bridge."""
        for broadcast_port in self._broadcast_ports:
            transport = self._transports.get(broadcast_port)

            if transport and not transport.is_closing():
                logger.info("stopping the udp bridge on port %s", broadcast_port)
                transport.close()
            else:
                logger.info("udp bridge on port %s not started", broadcast_port)

        self._is_running = False

    @property
    def is_running(self) -> bool:
        """bool: Return true if bridge is running."""
        return self._is_running

is_running: bool property ⚓︎

bool: Return true if bridge is running.

start() async ⚓︎

Create an asynchronous listener and start the bridge.

Source code in src/aioswitcher/bridge.py
async def start(self) -> None:
    """Create an asynchronous listener and start the bridge."""
    for broadcast_port in self._broadcast_ports:
        logger.info("starting the udp bridge on port %s", broadcast_port)
        protocol_factory = UdpClientProtocol(
            partial(_parse_device_from_datagram, self._on_device)
        )
        transport, protocol = await get_running_loop().create_datagram_endpoint(
            lambda: protocol_factory,
            local_addr=("0.0.0.0", broadcast_port),  # nosec
            family=AF_INET,
        )
        self._transports[broadcast_port] = transport
        logger.debug("udp bridge on port %s started", broadcast_port)

    self._is_running = True

stop() async ⚓︎

Stop the asynchronous bridge.

Source code in src/aioswitcher/bridge.py
async def stop(self) -> None:
    """Stop the asynchronous bridge."""
    for broadcast_port in self._broadcast_ports:
        transport = self._transports.get(broadcast_port)

        if transport and not transport.is_closing():
            logger.info("stopping the udp bridge on port %s", broadcast_port)
            transport.close()
        else:
            logger.info("udp bridge on port %s not started", broadcast_port)

    self._is_running = False

UdpClientProtocol ⚓︎

Bases: DatagramProtocol

Implementation of the Asyncio UDP DatagramProtocol.

Methods:

Name Description
connection_lost

Call on connection lost.

connection_made

Call on connection established.

datagram_received

Call on datagram received.

error_received

Call on exception received.

Source code in src/aioswitcher/bridge.py
@final
class UdpClientProtocol(DatagramProtocol):
    """Implementation of the Asyncio UDP DatagramProtocol."""

    def __init__(self, on_datagram: Callable[[bytes], None]) -> None:
        """Initialize the protocol."""
        self.transport: Optional[BaseTransport] = None
        self._on_datagram = on_datagram

    def connection_made(self, transport: BaseTransport) -> None:
        """Call on connection established."""
        self.transport = transport

    def datagram_received(self, data: bytes, addr: Tuple[Any, Any]) -> None:
        """Call on datagram received."""
        self._on_datagram(data)

    def error_received(self, exc: Optional[Exception]) -> None:
        """Call on exception received."""
        if exc:
            logger.error(f"udp client received error {exc}")
        else:
            warn("udp client received error")

    def connection_lost(self, exc: Optional[Exception]) -> None:
        """Call on connection lost."""
        if exc:
            logger.critical(f"udp bridge lost its connection {exc}")
        else:
            logger.info("udp connection stopped")

connection_lost(exc) ⚓︎

Call on connection lost.

Source code in src/aioswitcher/bridge.py
def connection_lost(self, exc: Optional[Exception]) -> None:
    """Call on connection lost."""
    if exc:
        logger.critical(f"udp bridge lost its connection {exc}")
    else:
        logger.info("udp connection stopped")

connection_made(transport) ⚓︎

Call on connection established.

Source code in src/aioswitcher/bridge.py
def connection_made(self, transport: BaseTransport) -> None:
    """Call on connection established."""
    self.transport = transport

datagram_received(data, addr) ⚓︎

Call on datagram received.

Source code in src/aioswitcher/bridge.py
def datagram_received(self, data: bytes, addr: Tuple[Any, Any]) -> None:
    """Call on datagram received."""
    self._on_datagram(data)

error_received(exc) ⚓︎

Call on exception received.

Source code in src/aioswitcher/bridge.py
def error_received(self, exc: Optional[Exception]) -> None:
    """Call on exception received."""
    if exc:
        logger.error(f"udp client received error {exc}")
    else:
        warn("udp client received error")

Switcher integration device module.

Modules:

Name Description
tools

Switcher integration device module tools.

DeviceCategory ⚓︎

Bases: Enum

Enum for relaying the device category.

Source code in src/aioswitcher/device/__init__.py
@unique
class DeviceCategory(Enum):
    """Enum for relaying the device category."""

    WATER_HEATER = auto()
    POWER_PLUG = auto()
    THERMOSTAT = auto()
    SHUTTER = auto()
    SINGLE_SHUTTER_DUAL_LIGHT = auto()
    DUAL_SHUTTER_SINGLE_LIGHT = auto()
    LIGHT = auto()

DeviceState ⚓︎

Bases: Enum

Enum class representing the device's state.

Source code in src/aioswitcher/device/__init__.py
@unique
class DeviceState(Enum):
    """Enum class representing the device's state."""

    ON = "01", "on"
    OFF = "00", "off"

    def __new__(cls, value: str, display: str) -> "DeviceState":
        """Override the default enum constructor and include extra properties."""
        new_enum = object.__new__(cls)
        new_enum._value = value  # type: ignore
        new_enum._display = display  # type: ignore
        return new_enum

    @property
    def display(self) -> str:
        """Return the display name of the state."""
        return self._display  # type: ignore

    @property
    def value(self) -> str:
        """Return the value of the state."""
        return self._value  # type: ignore

display: str property ⚓︎

Return the display name of the state.

value: str property ⚓︎

Return the value of the state.

DeviceType ⚓︎

Bases: Enum

Enum for relaying the type of the switcher devices.

Source code in src/aioswitcher/device/__init__.py
@unique
class DeviceType(Enum):
    """Enum for relaying the type of the switcher devices."""

    MINI = "Switcher Mini", "030f", 1, DeviceCategory.WATER_HEATER, False
    POWER_PLUG = "Switcher Power Plug", "01a8", 1, DeviceCategory.POWER_PLUG, False
    TOUCH = "Switcher Touch", "030b", 1, DeviceCategory.WATER_HEATER, False
    V2_ESP = "Switcher V2 (esp)", "01a7", 1, DeviceCategory.WATER_HEATER, False
    V2_QCA = "Switcher V2 (qualcomm)", "01a1", 1, DeviceCategory.WATER_HEATER, False
    V4 = "Switcher V4", "0317", 1, DeviceCategory.WATER_HEATER, False
    BREEZE = "Switcher Breeze", "0e01", 2, DeviceCategory.THERMOSTAT, False
    RUNNER = "Switcher Runner", "0c01", 2, DeviceCategory.SHUTTER, False
    RUNNER_MINI = "Switcher Runner Mini", "0c02", 2, DeviceCategory.SHUTTER, False
    RUNNER_S11 = (
        "Switcher Runner S11",
        "0f01",
        2,
        DeviceCategory.SINGLE_SHUTTER_DUAL_LIGHT,
        True,
    )
    RUNNER_S12 = (
        "Switcher Runner S12",
        "0f02",
        2,
        DeviceCategory.DUAL_SHUTTER_SINGLE_LIGHT,
        True,
    )
    LIGHT_SL01 = (
        "Switcher Light SL01",
        "0f04",
        2,
        DeviceCategory.LIGHT,
        True,
    )
    LIGHT_SL01_MINI = (
        "Switcher Light SL01 Mini",
        "0f07",
        2,
        DeviceCategory.LIGHT,
        True,
    )
    LIGHT_SL02 = (
        "Switcher Light SL02",
        "0f05",
        2,
        DeviceCategory.LIGHT,
        True,
    )
    LIGHT_SL02_MINI = (
        "Switcher Light SL02 Mini",
        "0f08",
        2,
        DeviceCategory.LIGHT,
        True,
    )
    LIGHT_SL03 = (
        "Switcher Light SL03",
        "0f06",
        2,
        DeviceCategory.LIGHT,
        True,
    )

    def __new__(
        cls,
        value: str,
        hex_rep: str,
        protocol_type: int,
        category: DeviceCategory,
        token_needed: bool,
    ) -> "DeviceType":
        """Override the default enum constructor and include extra properties."""
        new_enum = object.__new__(cls)
        new_enum._value = value  # type: ignore
        new_enum._hex_rep = hex_rep  # type: ignore
        new_enum._protocol_type = protocol_type  # type: ignore
        new_enum._category = category  # type: ignore
        new_enum._token_needed = token_needed  # type: ignore
        return new_enum

    @property
    def value(self) -> str:
        """Return the value of the state."""
        return self._value  # type: ignore

    @property
    def hex_rep(self) -> str:
        """Return the hexadecimal representation of the device type."""
        return self._hex_rep  # type: ignore

    @property
    def protocol_type(self) -> int:
        """Return the protocol type of the device."""
        return self._protocol_type  # type: ignore

    @property
    def category(self) -> DeviceCategory:
        """Return the category of the device type."""
        return self._category  # type: ignore

    @property
    def token_needed(self) -> bool:
        """Return true if token in needed for the device."""
        return self._token_needed  # type: ignore

category: DeviceCategory property ⚓︎

Return the category of the device type.

hex_rep: str property ⚓︎

Return the hexadecimal representation of the device type.

protocol_type: int property ⚓︎

Return the protocol type of the device.

token_needed: bool property ⚓︎

Return true if token in needed for the device.

value: str property ⚓︎

Return the value of the state.

ShutterChildLock ⚓︎

Bases: Enum

Enum class representing the shutter device's child lock state.

Source code in src/aioswitcher/device/__init__.py
@final
class ShutterChildLock(Enum):
    """Enum class representing the shutter device's child lock state."""

    ON = "01", "on"
    OFF = "00", "off"

    def __new__(cls, value: str, display: str) -> "ShutterChildLock":
        """Override the default enum constructor and include extra properties."""
        new_enum = object.__new__(cls)
        new_enum._value = value  # type: ignore
        new_enum._display = display  # type: ignore
        return new_enum

    @property
    def display(self) -> str:
        """Return the display name of the shutter child lock."""
        return self._display  # type: ignore

    @property
    def value(self) -> str:
        """Return the value of the shutter child lock."""
        return self._value  # type: ignore

display: str property ⚓︎

Return the display name of the shutter child lock.

value: str property ⚓︎

Return the value of the shutter child lock.

ShutterDirection ⚓︎

Bases: Enum

Enum class representing the shutter device's position.

Source code in src/aioswitcher/device/__init__.py
@final
class ShutterDirection(Enum):
    """Enum class representing the shutter device's position."""

    SHUTTER_STOP = "0000", "stop"
    SHUTTER_UP = "0100", "up"
    SHUTTER_DOWN = "0001", "down"

    def __new__(cls, value: str, display: str) -> "ShutterDirection":
        """Override the default enum constructor and include extra properties."""
        new_enum = object.__new__(cls)
        new_enum._value = value  # type: ignore
        new_enum._display = display  # type: ignore
        return new_enum

    @property
    def display(self) -> str:
        """Return the display name of the direction."""
        return self._display  # type: ignore

    @property
    def value(self) -> str:
        """Return the value of the direction."""
        return self._value  # type: ignore

display: str property ⚓︎

Return the display name of the direction.

value: str property ⚓︎

Return the value of the direction.

SwitcherBase dataclass ⚓︎

Bases: ABC

Abstraction for all switcher devices.

Parameters:

Name Type Description Default
device_type DeviceType

the DeviceType appropriate member.

required
device_state DeviceState

the DeviceState appropriate member.

required
device_id str

the id retrieved from the device.

required
device_key str

the login key of the device.

required
ip_address str

the ip address assigned to the device.

required
mac_address str

the mac address assigned to the device.

required
name str

the name of the device.

required
Source code in src/aioswitcher/device/__init__.py
@dataclass
class SwitcherBase(ABC):
    """Abstraction for all switcher devices.

    Args:
        device_type: the DeviceType appropriate member.
        device_state: the DeviceState appropriate member.
        device_id: the id retrieved from the device.
        device_key: the login key of the device.
        ip_address: the ip address assigned to the device.
        mac_address: the mac address assigned to the device.
        name: the name of the device.

    """

    device_type: DeviceType
    device_state: DeviceState
    device_id: str
    device_key: str
    ip_address: str
    mac_address: str
    name: str
    token_needed: bool
    last_data_update: datetime = field(init=False)

    def __post_init__(self) -> None:
        """Post initialization, set last_data_update to the instantiation datetime."""
        self.last_data_update = datetime.now()

SwitcherDualShutterSingleLight dataclass ⚓︎

Bases: SwitcherDualShutterSingleLightBase, SwitcherBase

Implementation of the Switcher dual Shutter with single light device.

Source code in src/aioswitcher/device/__init__.py
@final
@dataclass
class SwitcherDualShutterSingleLight(SwitcherDualShutterSingleLightBase, SwitcherBase):
    """Implementation of the Switcher dual Shutter with single light device."""

    def __post_init__(self) -> None:
        """Post initialization.

        Validate device type category as DUAL_SHUTTER_SINGLE_LIGHT.
        """
        if self.device_type.category != DeviceCategory.DUAL_SHUTTER_SINGLE_LIGHT:
            raise ValueError("only dual shutters with single lights are allowed")
        return super().__post_init__()

SwitcherDualShutterSingleLightBase dataclass ⚓︎

Bases: ABC

Abstraction for all switcher devices controlling dual shutter with single light.

Parameters:

Name Type Description Default
position List[int]

the current array of position of the shutter (integer percentage).

required
direction List[ShutterDirection]

the current array of direction of the shutter.

required
light List[DeviceState]

the current array of light state.

required
Source code in src/aioswitcher/device/__init__.py
@dataclass
class SwitcherDualShutterSingleLightBase(ABC):
    """Abstraction for all switcher devices controlling dual shutter with single light.

    Args:
        position: the current array of position of the shutter (integer percentage).
        direction: the current array of direction of the shutter.
        light: the current array of light state.
    """

    position: List[int]
    direction: List[ShutterDirection]
    child_lock: List[ShutterChildLock]
    light: List[DeviceState]

SwitcherLight dataclass ⚓︎

Bases: SwitcherLightBase, SwitcherBase

Implementation of the Switcher Light device.

Source code in src/aioswitcher/device/__init__.py
@final
@dataclass
class SwitcherLight(SwitcherLightBase, SwitcherBase):
    """Implementation of the Switcher Light device."""

    def __post_init__(self) -> None:
        """Post initialization validate device type category as LIGHT."""
        if self.device_type.category != DeviceCategory.LIGHT:
            raise ValueError("only lights are allowed")
        return super().__post_init__()

SwitcherLightBase dataclass ⚓︎

Bases: ABC

Abstraction for all switcher devices controlling light.

Parameters:

Name Type Description Default
light List[DeviceState]

the current array of light state.

required
Source code in src/aioswitcher/device/__init__.py
@dataclass
class SwitcherLightBase(ABC):
    """Abstraction for all switcher devices controlling light.

    Args:
        light: the current array of light state.
    """

    light: List[DeviceState]

SwitcherPowerBase dataclass ⚓︎

Bases: ABC

Abstraction for all switcher devices reporting power data.

Parameters:

Name Type Description Default
power_consumption int

the current power consumption in watts.

required
electric_current float

the current power consumption in amps.

required
Source code in src/aioswitcher/device/__init__.py
@dataclass
class SwitcherPowerBase(ABC):
    """Abstraction for all switcher devices reporting power data.

    Args:
        power_consumption: the current power consumption in watts.
        electric_current: the current power consumption in amps.

    """

    power_consumption: int
    electric_current: float

SwitcherPowerPlug dataclass ⚓︎

Bases: SwitcherPowerBase, SwitcherBase

Implementation of the Switcher Power Plug device.

Please Note the order of the inherited classes to understand the order of the instantiation parameters and the super call.

Source code in src/aioswitcher/device/__init__.py
@final
@dataclass
class SwitcherPowerPlug(SwitcherPowerBase, SwitcherBase):
    """Implementation of the Switcher Power Plug device.

    Please Note the order of the inherited classes to understand the order of the
    instantiation parameters and the super call.
    """

    def __post_init__(self) -> None:
        """Post initialization validate device type category as POWER_PLUG."""
        if self.device_type.category != DeviceCategory.POWER_PLUG:
            raise ValueError("only power plugs are allowed")
        super().__post_init__()

SwitcherShutter dataclass ⚓︎

Bases: SwitcherShutterBase, SwitcherBase

Implementation of the Switcher Shutter device.

Source code in src/aioswitcher/device/__init__.py
@final
@dataclass
class SwitcherShutter(SwitcherShutterBase, SwitcherBase):
    """Implementation of the Switcher Shutter device."""

    def __post_init__(self) -> None:
        """Post initialization validate device type category as SHUTTER."""
        if self.device_type.category != DeviceCategory.SHUTTER:
            raise ValueError("only shutters are allowed")
        return super().__post_init__()

SwitcherShutterBase dataclass ⚓︎

Bases: ABC

Abstraction for all switcher devices controlling shutter.

Parameters:

Name Type Description Default
position List[int]

the current array of position of the shutter (integer percentage).

required
direction List[ShutterDirection]

the current array of direction of the shutter.

required
Source code in src/aioswitcher/device/__init__.py
@dataclass
class SwitcherShutterBase(ABC):
    """Abstraction for all switcher devices controlling shutter.

    Args:
        position: the current array of position of the shutter (integer percentage).
        direction: the current array of direction of the shutter.
    """

    position: List[int]
    direction: List[ShutterDirection]
    child_lock: List[ShutterChildLock]

SwitcherSingleShutterDualLight dataclass ⚓︎

Bases: SwitcherSingleShutterDualLightBase, SwitcherBase

Implementation of the Switcher Shutter with dual light device.

Source code in src/aioswitcher/device/__init__.py
@final
@dataclass
class SwitcherSingleShutterDualLight(SwitcherSingleShutterDualLightBase, SwitcherBase):
    """Implementation of the Switcher Shutter with dual light device."""

    def __post_init__(self) -> None:
        """Post initialization.

        Validate device type category as SINGLE_SHUTTER_DUAL_LIGHT.
        """
        if self.device_type.category != DeviceCategory.SINGLE_SHUTTER_DUAL_LIGHT:
            raise ValueError("only shutters with dual lights are allowed")
        return super().__post_init__()

SwitcherSingleShutterDualLightBase dataclass ⚓︎

Bases: ABC

Abstraction for all switcher devices controlling shutter with dual light.

Parameters:

Name Type Description Default
position List[int]

the current array of position of the shutter (integer percentage).

required
direction List[ShutterDirection]

the current array of direction of the shutter.

required
light List[DeviceState]

the current array of light state.

required
Source code in src/aioswitcher/device/__init__.py
@dataclass
class SwitcherSingleShutterDualLightBase(ABC):
    """Abstraction for all switcher devices controlling shutter with dual light.

    Args:
        position: the current array of position of the shutter (integer percentage).
        direction: the current array of direction of the shutter.
        light: the current array of light state.
    """

    position: List[int]
    direction: List[ShutterDirection]
    child_lock: List[ShutterChildLock]
    light: List[DeviceState]

SwitcherThermostat dataclass ⚓︎

Bases: SwitcherThermostatBase, SwitcherBase

Implementation of the Switcher Thermostat device.

Source code in src/aioswitcher/device/__init__.py
@final
@dataclass
class SwitcherThermostat(SwitcherThermostatBase, SwitcherBase):
    """Implementation of the Switcher Thermostat device."""

    def __post_init__(self) -> None:
        """Post initialization validate device type category as THERMOSTAT."""
        if self.device_type.category != DeviceCategory.THERMOSTAT:
            raise ValueError("only thermostats are allowed")
        self.remote = None
        return super().__post_init__()

SwitcherThermostatBase dataclass ⚓︎

Bases: ABC

Abstraction for switcher thermostat devices.

Parameters:

Name Type Description Default
mode ThermostatMode

the mode of the thermostat.

required
temperature float

the current temperature in celsius.

required
target_temperature int

the current target temperature in celsius.

required
fan_level ThermostatFanLevel

the current fan level in celsius.

required
swing ThermostatSwing

the current swing state.

required
remote_id str

the id of the remote used to control this thermostat

required
Source code in src/aioswitcher/device/__init__.py
@dataclass
class SwitcherThermostatBase(ABC):
    """Abstraction for switcher thermostat devices.

    Args:
        mode: the mode of the thermostat.
        temperature: the current temperature in celsius.
        target_temperature: the current target temperature in celsius.
        fan_level: the current fan level in celsius.
        swing: the current swing state.
        remote_id: the id of the remote used to control this thermostat
    """

    mode: ThermostatMode
    temperature: float
    target_temperature: int
    fan_level: ThermostatFanLevel
    swing: ThermostatSwing
    remote_id: str

SwitcherTimedBase dataclass ⚓︎

Bases: ABC

Abstraction for all switcher devices supporting timed operations.

Parameters:

Name Type Description Default
remaining_time str

remaining time to current run.

required
auto_shutdown str

configured value for auto shutdown.

required
Source code in src/aioswitcher/device/__init__.py
@dataclass
class SwitcherTimedBase(ABC):
    """Abstraction for all switcher devices supporting timed operations.

    Args:
        remaining_time: remaining time to current run.
        auto_shutdown: configured value for auto shutdown.

    """

    remaining_time: str
    auto_shutdown: str

    @property
    def auto_off_set(self) -> str:
        """Fix for backward compatibility issues with home assistant."""
        return self.auto_shutdown

auto_off_set: str property ⚓︎

Fix for backward compatibility issues with home assistant.

SwitcherWaterHeater dataclass ⚓︎

Bases: SwitcherTimedBase, SwitcherPowerBase, SwitcherBase

Implementation of the Switcher Water Heater device.

Please Note the order of the inherited classes to understand the order of the instantiation parameters and the super call.

Source code in src/aioswitcher/device/__init__.py
@final
@dataclass
class SwitcherWaterHeater(SwitcherTimedBase, SwitcherPowerBase, SwitcherBase):
    """Implementation of the Switcher Water Heater device.

    Please Note the order of the inherited classes to understand the order of the
    instantiation parameters and the super call.
    """

    def __post_init__(self) -> None:
        """Post initialization validate device type category as WATER_HEATER."""
        if self.device_type.category != DeviceCategory.WATER_HEATER:
            raise ValueError("only water heaters are allowed")
        super().__post_init__()

ThermostatFanLevel ⚓︎

Bases: Enum

Enum class representing the thermostat device's fan level.

Source code in src/aioswitcher/device/__init__.py
class ThermostatFanLevel(Enum):
    """Enum class representing the thermostat device's fan level."""

    LOW = "1", "low"
    MEDIUM = "2", "medium"
    HIGH = "3", "high"
    AUTO = "0", "auto"

    def __new__(cls, value: str, display: str) -> "ThermostatFanLevel":
        """Override the default enum constructor and include extra properties."""
        new_enum = object.__new__(cls)
        new_enum._value = value  # type: ignore
        new_enum._display = display  # type: ignore
        return new_enum

    @property
    def display(self) -> str:
        """Return the display name of the fan level."""
        return self._display  # type: ignore

    @property
    def value(self) -> str:
        """Return the value of the fan level."""
        return self._value  # type: ignore

display: str property ⚓︎

Return the display name of the fan level.

value: str property ⚓︎

Return the value of the fan level.

ThermostatMode ⚓︎

Bases: Enum

Enum class representing the thermostat device's position.

Source code in src/aioswitcher/device/__init__.py
class ThermostatMode(Enum):
    """Enum class representing the thermostat device's position."""

    AUTO = "01", "auto"
    DRY = "02", "dry"
    FAN = "03", "fan"
    COOL = "04", "cool"
    HEAT = "05", "heat"

    def __new__(cls, value: str, display: str) -> "ThermostatMode":
        """Override the default enum constructor and include extra properties."""
        new_enum = object.__new__(cls)
        new_enum._value = value  # type: ignore
        new_enum._display = display  # type: ignore
        return new_enum

    @property
    def display(self) -> str:
        """Return the display name of the mode."""
        return self._display  # type: ignore

    @property
    def value(self) -> str:
        """Return the value of the mode."""
        return self._value  # type: ignore

display: str property ⚓︎

Return the display name of the mode.

value: str property ⚓︎

Return the value of the mode.

ThermostatSwing ⚓︎

Bases: Enum

Enum class representing the thermostat device's swing state.

Source code in src/aioswitcher/device/__init__.py
class ThermostatSwing(Enum):
    """Enum class representing the thermostat device's swing state."""

    OFF = "0", "off"
    ON = "1", "on"

    def __new__(cls, value: str, display: str) -> "ThermostatSwing":
        """Override the default enum constructor and include extra properties."""
        new_enum = object.__new__(cls)
        new_enum._value = value  # type: ignore
        new_enum._display = display  # type: ignore
        return new_enum

    @property
    def display(self) -> str:
        """Return the display name of the swing."""
        return self._display  # type: ignore

    @property
    def value(self) -> str:
        """Return the value of the swing."""
        return self._value  # type: ignore

display: str property ⚓︎

Return the display name of the swing.

value: str property ⚓︎

Return the value of the swing.

Switcher integration device module tools.

Functions:

Name Description
convert_token_to_packet

Convert a token to token packet.

current_timestamp_to_hexadecimal

Generate hexadecimal representation of the current timestamp.

get_light_api_packet_index

Return the correct light api packet index.

get_light_discovery_packet_index

Return the correct light discovery packet index.

get_shutter_api_packet_index

Return the correct shutter api packet index.

get_shutter_discovery_packet_index

Return the correct shutter discovery packet index.

minutes_to_hexadecimal_seconds

Encode minutes to an hexadecimal packed as little endian unsigned int.

seconds_to_iso_time

Convert seconds to iso time.

set_message_length

Set the message length.

sign_packet_with_crc_key

Sign the packets with the designated crc key.

string_to_hexadecimale_device_name

Encode string device name to an appropriate hexadecimal value.

timedelta_to_hexadecimal_seconds

Encode timedelta as seconds to an hexadecimal packed as little endian unsigned.

validate_token

Make an asynchronous API call to validate a Token by username and token.

watts_to_amps

Convert power consumption to watts to electric current in amps.

convert_token_to_packet(token) ⚓︎

Convert a token to token packet.

Parameters:

Name Type Description Default
token str

the token of the user sent by Email

required
Return

Token packet if token is valid, otherwise empty string or raise error.

Source code in src/aioswitcher/device/tools.py
def convert_token_to_packet(token: str) -> str:
    """Convert a token to token packet.

    Args:
        token: the token of the user sent by Email

    Return:
        Token packet if token is valid,
        otherwise empty string or raise error.

    """
    try:
        token_key = b"jzNrAOjc%lpg3pVr5cF!5Le06ZgOdWuJ"
        encrypted_value = b64decode(bytes(token, "utf-8"))
        cipher = AES.new(token_key, AES.MODE_ECB)
        decrypted_value = cipher.decrypt(encrypted_value)
        unpadded_decrypted_value = unpad(decrypted_value, AES.block_size)
        return hexlify(unpadded_decrypted_value).decode()
    except (KeyError, ValueError) as ve:
        raise RuntimeError("convert token to packet was not successful") from ve

current_timestamp_to_hexadecimal() ⚓︎

Generate hexadecimal representation of the current timestamp.

Return

Hexadecimal representation of the current unix time retrieved by time.time.

Source code in src/aioswitcher/device/tools.py
def current_timestamp_to_hexadecimal() -> str:
    """Generate hexadecimal representation of the current timestamp.

    Return:
        Hexadecimal representation of the current unix time retrieved by ``time.time``.

    """
    round_timestamp = int(round(time.time()))
    binary_timestamp = pack("<I", round_timestamp)
    hex_timestamp = hexlify(binary_timestamp)
    return hex_timestamp.decode()

get_light_api_packet_index(device_type, circuit_number) ⚓︎

Return the correct light api packet index.

Used in sending the light on/off status with the packet (based of device type and circuit number).

Source code in src/aioswitcher/device/tools.py
def get_light_api_packet_index(device_type: DeviceType, circuit_number: int) -> int:
    """Return the correct light api packet index.

    Used in sending the light on/off status with the packet
    (based of device type and circuit number).
    """
    # We need to convert selected circuit number to actual place in the packet.
    # That is why we add + 1
    return get_light_discovery_packet_index(device_type, circuit_number) + 1

get_light_discovery_packet_index(device_type, circuit_number) ⚓︎

Return the correct light discovery packet index.

Used in retriving the light on/off status from the packet (based of device type and circuit number).

Source code in src/aioswitcher/device/tools.py
def get_light_discovery_packet_index(
    device_type: DeviceType, circuit_number: int
) -> int:
    """Return the correct light discovery packet index.

    Used in retriving the light on/off status from the packet
    (based of device type and circuit number).
    """
    if device_type == DeviceType.LIGHT_SL03:
        if circuit_number not in [0, 1, 2]:
            raise ValueError("Invalid circuit number")
        return circuit_number
    if device_type in (
        DeviceType.RUNNER_S11,
        DeviceType.LIGHT_SL02,
        DeviceType.LIGHT_SL02_MINI,
    ):
        if circuit_number not in [0, 1]:
            raise ValueError("Invalid circuit number")
        return circuit_number
    if device_type in (
        DeviceType.RUNNER_S12,
        DeviceType.LIGHT_SL01,
        DeviceType.LIGHT_SL01_MINI,
    ):
        if circuit_number != 0:
            raise ValueError("Invalid circuit number")
        return 0

    raise ValueError("only devices that has lights are allowed")

get_shutter_api_packet_index(device_type, circuit_number) ⚓︎

Return the correct shutter api packet index.

Used in sending the shutter position/direction with the packet (based of device type and circuit number).

Source code in src/aioswitcher/device/tools.py
def get_shutter_api_packet_index(device_type: DeviceType, circuit_number: int) -> int:
    """Return the correct shutter api packet index.

    Used in sending the shutter position/direction with the packet
    (based of device type and circuit number).
    """
    # We need to convert selected circuit number to actual place in the packet.
    # That is why we add + 1
    return get_shutter_discovery_packet_index(device_type, circuit_number) + 1

get_shutter_discovery_packet_index(device_type, circuit_number) ⚓︎

Return the correct shutter discovery packet index.

Used in retriving the shutter position/direction from the packet (based of device type and circuit number).

Source code in src/aioswitcher/device/tools.py
def get_shutter_discovery_packet_index(
    device_type: DeviceType, circuit_number: int
) -> int:
    """Return the correct shutter discovery packet index.

    Used in retriving the shutter position/direction from the packet
    (based of device type and circuit number).
    """
    if device_type != DeviceType.RUNNER_S12 and circuit_number != 0:
        raise ValueError("Invalid circuit number")
    if device_type == DeviceType.RUNNER_S12 and circuit_number not in [0, 1]:
        raise ValueError("Invalid circuit number")

    if device_type in (DeviceType.RUNNER, DeviceType.RUNNER_MINI):
        return 0
    elif device_type == DeviceType.RUNNER_S11:
        return 2
    elif device_type == DeviceType.RUNNER_S12:
        return circuit_number + 1

    raise ValueError("only shutters are allowed")

minutes_to_hexadecimal_seconds(minutes) ⚓︎

Encode minutes to an hexadecimal packed as little endian unsigned int.

Parameters:

Name Type Description Default
minutes int

minutes to encode.

required
Return

Hexadecimal representation of the minutes argument.

Source code in src/aioswitcher/device/tools.py
def minutes_to_hexadecimal_seconds(minutes: int) -> str:
    """Encode minutes to an hexadecimal packed as little endian unsigned int.

    Args:
        minutes: minutes to encode.

    Return:
        Hexadecimal representation of the minutes argument.

    """
    return hexlify(pack("<I", minutes * 60)).decode()

seconds_to_iso_time(all_seconds) ⚓︎

Convert seconds to iso time.

Parameters:

Name Type Description Default
all_seconds int

the total number of seconds to convert.

required
Return

A string representing the converted iso time in %H:%M:%S format. e.g. "02:24:37".

Source code in src/aioswitcher/device/tools.py
def seconds_to_iso_time(all_seconds: int) -> str:
    """Convert seconds to iso time.

    Args:
        all_seconds: the total number of seconds to convert.

    Return:
        A string representing the converted iso time in %H:%M:%S format.
        e.g. "02:24:37".

    """
    minutes, seconds = divmod(int(all_seconds), 60)
    hours, minutes = divmod(minutes, 60)

    return datetime.time(hour=hours, minute=minutes, second=seconds).isoformat()

set_message_length(message) ⚓︎

Set the message length.

Source code in src/aioswitcher/device/tools.py
def set_message_length(message: str) -> str:
    """Set the message length."""
    length = "{:x}".format(len(unhexlify(message + "00000000"))).ljust(4, "0")
    return "fef0" + str(length) + message[8:]

sign_packet_with_crc_key(hex_packet) ⚓︎

Sign the packets with the designated crc key.

Parameters:

Name Type Description Default
hex_packet str

packet to sign.

required
Return

The calculated and signed packet.

Source code in src/aioswitcher/device/tools.py
def sign_packet_with_crc_key(hex_packet: str) -> str:
    """Sign the packets with the designated crc key.

    Args:
        hex_packet: packet to sign.

    Return:
        The calculated and signed packet.

    """
    binary_packet = unhexlify(hex_packet)
    binary_packet_crc = pack(">I", crc_hqx(binary_packet, 0x1021))
    hex_packet_crc = hexlify(binary_packet_crc).decode()
    hex_packet_crc_sliced = hex_packet_crc[6:8] + hex_packet_crc[4:6]

    binary_key = unhexlify(hex_packet_crc_sliced + "30" * 32)
    binary_key_crc = pack(">I", crc_hqx(binary_key, 0x1021))
    hex_key_crc = hexlify(binary_key_crc).decode()
    hex_key_crc_sliced = hex_key_crc[6:8] + hex_key_crc[4:6]

    return hex_packet + hex_packet_crc_sliced + hex_key_crc_sliced

string_to_hexadecimale_device_name(name) ⚓︎

Encode string device name to an appropriate hexadecimal value.

Parameters:

Name Type Description Default
name str

the desired name for encoding.

required
Return

Hexadecimal representation of the name argument.

Source code in src/aioswitcher/device/tools.py
def string_to_hexadecimale_device_name(name: str) -> str:
    """Encode string device name to an appropriate hexadecimal value.

    Args:
        name: the desired name for encoding.

    Return:
        Hexadecimal representation of the name argument.

    """
    length = len(name)
    if 1 < length < 33:
        hex_name = hexlify(name.encode())
        zeros_pad = ("00" * (32 - length)).encode()
        return (hex_name + zeros_pad).decode()
    raise ValueError("name length can vary from 2 to 32")

timedelta_to_hexadecimal_seconds(full_time) ⚓︎

Encode timedelta as seconds to an hexadecimal packed as little endian unsigned.

Parameters:

Name Type Description Default
full_time timedelta

timedelta time between 1 and 24 hours, seconds are ignored.

required
Return

Hexadecimal representation of the seconds built fom the full_time argument.

Source code in src/aioswitcher/device/tools.py
def timedelta_to_hexadecimal_seconds(full_time: datetime.timedelta) -> str:
    """Encode timedelta as seconds to an hexadecimal packed as little endian unsigned.

    Args:
        full_time: timedelta time between 1 and 24 hours, seconds are ignored.

    Return:
        Hexadecimal representation of the seconds built fom the full_time argument.

    """
    minutes = full_time.total_seconds() / 60
    hours, minutes = divmod(minutes, 60)
    seconds = int(hours) * 3600 + int(minutes) * 60

    if 3599 < seconds < 86341:
        return hexlify(pack("<I", int(seconds))).decode()

    raise ValueError("can only handle 1 to 24 hours")

validate_token(username, token) async ⚓︎

Make an asynchronous API call to validate a Token by username and token.

Source code in src/aioswitcher/device/tools.py
async def validate_token(username: str, token: str) -> bool:
    """Make an asynchronous API call to validate a Token by username and token."""
    request_url = "https://switcher.co.il/ValidateToken/"
    request_data = {"email": username, "token": token}
    is_token_valid = False
    # Preload the SSL context
    ssl_context = ssl.SSLContext()

    logger.debug("calling API call for Switcher to validate the token")

    async with aiohttp.ClientSession() as session:
        async with session.post(
            request_url, data=request_data, ssl=ssl_context
        ) as response:
            if response.status == 200:
                logger.debug("request successful")
                try:
                    response_json = await response.json()
                    result = response_json.get("result", None)
                    if result is not None:
                        is_token_valid = result.lower() == "true"
                except aiohttp.ContentTypeError:
                    logger.debug("response content is not valid JSON")
            else:
                logger.debug("request failed with status code: %s", response.status)

    return is_token_valid

watts_to_amps(watts) ⚓︎

Convert power consumption to watts to electric current in amps.

Source code in src/aioswitcher/device/tools.py
def watts_to_amps(watts: int) -> float:
    """Convert power consumption to watts to electric current in amps."""
    return round((watts / float(220)), 1)

Switcher integration schedule module.

Modules:

Name Description
parser

Switcher integration schedule parser module.

tools

Switcher integration schedule module tools.

Days ⚓︎

Bases: Enum

Enum class representing the day entity.

Source code in src/aioswitcher/schedule/__init__.py
@unique
class Days(Enum):
    """Enum class representing the day entity."""

    MONDAY = ("Monday", 0x02, 2, 0)
    TUESDAY = ("Tuesday", 0x04, 4, 1)
    WEDNESDAY = ("Wednesday", 0x08, 8, 2)
    THURSDAY = ("Thursday", 0x10, 16, 3)
    FRIDAY = ("Friday", 0x20, 32, 4)
    SATURDAY = ("Saturday", 0x40, 64, 5)
    SUNDAY = ("Sunday", 0x80, 128, 6)

    def __new__(cls, value: str, hex_rep: int, bit_rep: int, weekday: int) -> "Days":
        """Override the default enum constructor and include extra properties."""
        new_enum = object.__new__(cls)
        new_enum._value_ = value
        new_enum._hex_rep = hex_rep  # type: ignore
        new_enum._bit_rep = bit_rep  # type: ignore
        new_enum._weekday = weekday  # type: ignore
        return new_enum

    @property
    def bit_rep(self) -> int:
        """Return the bit representation of the day."""
        return self._bit_rep  # type: ignore

    @property
    def hex_rep(self) -> int:
        """Return the hexadecimal representation of the day."""
        return self._hex_rep  # type: ignore

    @property
    def weekday(self) -> int:
        """Return the weekday of the day."""
        return self._weekday  # type: ignore

bit_rep: int property ⚓︎

Return the bit representation of the day.

hex_rep: int property ⚓︎

Return the hexadecimal representation of the day.

weekday: int property ⚓︎

Return the weekday of the day.

ScheduleState ⚓︎

Bases: Enum

Enum representing the status of the schedule.

Source code in src/aioswitcher/schedule/__init__.py
@unique
class ScheduleState(Enum):
    """Enum representing the status of the schedule."""

    ENABLED = "01"
    DISABLED = "00"

Switcher integration schedule parser module.

Modules:

Name Description
tools

Switcher integration schedule module tools.

Functions:

Name Description
get_schedules

Use to create a list of schedule from a response message from the device.

ScheduleParser dataclass ⚓︎

Schedule parsing tool.

Methods:

Name Description
get_days

Retun a set of the scheduled Days.

get_end_time

Return the schedule end time in %H:%M format.

get_id

Return the id of the schedule.

get_start_time

Return the schedule start time in %H:%M format.

get_state

Return the current state of the device.

is_enabled

Return true if enbaled.

is_recurring

Return true if a recurring schedule.

Source code in src/aioswitcher/schedule/parser.py
@final
@dataclass(frozen=True)
class ScheduleParser:
    """Schedule parsing tool."""

    schedule: bytes

    def get_id(self) -> str:
        """Return the id of the schedule."""
        return str(int(self.schedule[0:2], 16))

    def is_enabled(self) -> bool:
        """Return true if enbaled."""
        return int(self.schedule[2:4], 16) == 1

    def is_recurring(self) -> bool:
        """Return true if a recurring schedule."""
        return self.schedule[4:6] != b"00"

    def get_days(self) -> Set[Days]:
        """Retun a set of the scheduled Days."""
        return (
            tools.bit_summary_to_days(int(self.schedule[4:6], 16))
            if self.is_recurring()
            else set()
        )

    def get_state(self) -> ScheduleState:
        """Return the current state of the device.

        Not sure if this needs to be included in the schedule object.
        """
        return ScheduleState(self.schedule[6:8].decode())

    def get_start_time(self) -> str:
        """Return the schedule start time in %H:%M format."""
        return tools.hexadecimale_timestamp_to_localtime(self.schedule[8:16])

    def get_end_time(self) -> str:
        """Return the schedule end time in %H:%M format."""
        return tools.hexadecimale_timestamp_to_localtime(self.schedule[16:24])

get_days() ⚓︎

Retun a set of the scheduled Days.

Source code in src/aioswitcher/schedule/parser.py
def get_days(self) -> Set[Days]:
    """Retun a set of the scheduled Days."""
    return (
        tools.bit_summary_to_days(int(self.schedule[4:6], 16))
        if self.is_recurring()
        else set()
    )

get_end_time() ⚓︎

Return the schedule end time in %H:%M format.

Source code in src/aioswitcher/schedule/parser.py
def get_end_time(self) -> str:
    """Return the schedule end time in %H:%M format."""
    return tools.hexadecimale_timestamp_to_localtime(self.schedule[16:24])

get_id() ⚓︎

Return the id of the schedule.

Source code in src/aioswitcher/schedule/parser.py
def get_id(self) -> str:
    """Return the id of the schedule."""
    return str(int(self.schedule[0:2], 16))

get_start_time() ⚓︎

Return the schedule start time in %H:%M format.

Source code in src/aioswitcher/schedule/parser.py
def get_start_time(self) -> str:
    """Return the schedule start time in %H:%M format."""
    return tools.hexadecimale_timestamp_to_localtime(self.schedule[8:16])

get_state() ⚓︎

Return the current state of the device.

Not sure if this needs to be included in the schedule object.

Source code in src/aioswitcher/schedule/parser.py
def get_state(self) -> ScheduleState:
    """Return the current state of the device.

    Not sure if this needs to be included in the schedule object.
    """
    return ScheduleState(self.schedule[6:8].decode())

is_enabled() ⚓︎

Return true if enbaled.

Source code in src/aioswitcher/schedule/parser.py
def is_enabled(self) -> bool:
    """Return true if enbaled."""
    return int(self.schedule[2:4], 16) == 1

is_recurring() ⚓︎

Return true if a recurring schedule.

Source code in src/aioswitcher/schedule/parser.py
def is_recurring(self) -> bool:
    """Return true if a recurring schedule."""
    return self.schedule[4:6] != b"00"

SwitcherSchedule dataclass ⚓︎

representation of the Switcher schedule slot.

Parameters:

Name Type Description Default
schedule_id str

the id of the schedule

required
recurring bool

is a recurring schedule

required
days Set[Days]

a set of schedule days, or empty set for non recurring schedules

required
start_time str

the start time of the schedule

required
end_time str

the end time of the schedule

required
Source code in src/aioswitcher/schedule/parser.py
@final
@dataclass
class SwitcherSchedule:
    """representation of the Switcher schedule slot.

    Args:
        schedule_id: the id of the schedule
        recurring: is a recurring schedule
        days: a set of schedule days, or empty set for non recurring schedules
        start_time: the start time of the schedule
        end_time: the end time of the schedule

    """

    schedule_id: str
    recurring: bool
    days: Set[Days]
    start_time: str
    end_time: str
    duration: str = field(init=False)
    display: str = field(init=False)

    def __post_init__(self) -> None:
        """Post initialization, set duration and display."""
        self.duration = tools.calc_duration(self.start_time, self.end_time)
        self.display = tools.pretty_next_run(self.start_time, self.days)

    def __hash__(self) -> int:
        """For usage with set, implementation of the __hash__ magic method."""
        return hash(self.schedule_id)

    def __eq__(self, obj: object) -> bool:
        """For usage with set, implementation of the __eq__ magic method."""
        if isinstance(obj, SwitcherSchedule):
            return self.schedule_id == obj.schedule_id
        return False

get_schedules(message) ⚓︎

Use to create a list of schedule from a response message from the device.

Source code in src/aioswitcher/schedule/parser.py
def get_schedules(message: bytes) -> Set[SwitcherSchedule]:
    """Use to create a list of schedule from a response message from the device."""
    hex_data = hexlify(message)[90:-8].decode()
    hex_data_split = wrap(hex_data, 32)
    ret_set = set()
    for schedule in hex_data_split:
        parser = ScheduleParser(schedule.encode())
        ret_set.add(
            SwitcherSchedule(
                parser.get_id(),
                parser.is_recurring(),
                parser.get_days(),
                parser.get_start_time(),
                parser.get_end_time(),
            )
        )
    return ret_set

Switcher integration schedule module tools.

Functions:

Name Description
bit_summary_to_days

Decode a weekdays bit summary to a set of weekdays.

calc_duration

Use to calculate the delta between two time values formated as %H:%M.

hexadecimale_timestamp_to_localtime

Decode an hexadecimale timestamp to localtime with the format %H:%M.

pretty_next_run

Create a literal for displaying the next run time.

time_to_hexadecimal_timestamp

Convert hours and minutes to a timestamp with the current date and encode.

weekdays_to_hexadecimal

Sum the requested weekdays bit representation and return as hexadecimal value.

bit_summary_to_days(sum_weekdays_bit) ⚓︎

Decode a weekdays bit summary to a set of weekdays.

Parameters:

Name Type Description Default
sum_weekdays_bit int

the sum of all weekdays

required
Return

Set of Weekday members decoded from the summary value.

Todo

Should an existing remainder in the sum value throw an error? E.g. 3 will result in a set of MONDAY and the remainder will be 1.

Source code in src/aioswitcher/schedule/tools.py
def bit_summary_to_days(sum_weekdays_bit: int) -> Set[Days]:
    """Decode a weekdays bit summary to a set of weekdays.

    Args:
        sum_weekdays_bit: the sum of all weekdays

    Return:
        Set of Weekday members decoded from the summary value.

    Todo:
        Should an existing remainder in the sum value throw an error?
        E.g. 3 will result in a set of MONDAY and the remainder will be 1.

    """
    if 1 < sum_weekdays_bit < 255:
        return_weekdays = set()
        weekdays_by_hex = map(lambda w: (w.hex_rep, w), Days)
        for weekday_hex in weekdays_by_hex:
            if weekday_hex[0] & sum_weekdays_bit != 0:
                return_weekdays.add(weekday_hex[1])
        return return_weekdays
    raise ValueError("weekdays bit sum should be between 2 and 254")

calc_duration(start_time, end_time) ⚓︎

Use to calculate the delta between two time values formated as %H:%M.

Source code in src/aioswitcher/schedule/tools.py
def calc_duration(start_time: str, end_time: str) -> str:
    """Use to calculate the delta between two time values formated as %H:%M."""
    start_datetime = datetime.strptime(start_time, "%H:%M")
    end_datetime = datetime.strptime(end_time, "%H:%M")
    if end_datetime < start_datetime:
        end_datetime += timedelta(days=1)
    return str(end_datetime - start_datetime)

hexadecimale_timestamp_to_localtime(hex_timestamp) ⚓︎

Decode an hexadecimale timestamp to localtime with the format %H:%M.

Parameters:

Name Type Description Default
hex_timestamp bytes

the hexadecimale timestamp.

required
Return

Localtime string with %H:%M format. e.g. "20:30".

Source code in src/aioswitcher/schedule/tools.py
def hexadecimale_timestamp_to_localtime(hex_timestamp: bytes) -> str:
    """Decode an hexadecimale timestamp to localtime with the format %H:%M.

    Args:
        hex_timestamp: the hexadecimale timestamp.

    Return:
        Localtime string with %H:%M format. e.g. "20:30".
    """
    hex_time = (
        hex_timestamp[6:8]
        + hex_timestamp[4:6]
        + hex_timestamp[2:4]
        + hex_timestamp[0:2]
    )
    int_time = int(hex_time, 16)
    local_time = time.localtime(int_time)
    return time.strftime("%H:%M", local_time)

pretty_next_run(start_time, days=set()) ⚓︎

Create a literal for displaying the next run time.

Parameters:

Name Type Description Default
start_time str

the start of the schedule in "%H:%M" format, e.g. "17:00".

required
days Set[Days]

for recurring schedules, a list of days when none, will be today.

set()

Returns:

Type Description
str

A pretty string describing the next due run.

str

e.g. "Due next Sunday at 17:00".

Source code in src/aioswitcher/schedule/tools.py
def pretty_next_run(start_time: str, days: Set[Days] = set()) -> str:
    """Create a literal for displaying the next run time.

    Args:
        start_time: the start of the schedule in "%H:%M" format, e.g. "17:00".
        days: for recurring schedules, a list of days when none, will be today.

    Returns:
        A pretty string describing the next due run.
        e.g. "Due next Sunday at 17:00".

    """
    if not days:
        return f"Due today at {start_time}"

    current_datetime = datetime.utcnow()
    current_weekday = current_datetime.weekday()

    current_time = datetime.strptime(
        current_datetime.time().strftime("%H:%M"), "%H:%M"
    ).time()
    schedule_time = datetime.strptime(start_time, "%H:%M").time()
    current_time_plus_one_hour = (
        datetime.combine(datetime.today(), current_time) + timedelta(hours=1)
    ).time()

    execution_days = [d.weekday for d in days]
    # if scheduled for later on today, return "due today"
    if current_weekday in execution_days and (
        current_time < schedule_time or current_time_plus_one_hour >= schedule_time
    ):
        return f"Due today at {start_time}"

    execution_days.sort()
    if current_weekday > execution_days[-1]:
        next_exc_day = execution_days[0]
    else:
        next_exc_day = list(filter(lambda d: d >= current_weekday, execution_days))[0]

    # if next excution day is tomorrow for the current day, or this is the week end
    # (today is sunday and tomorrow is monday)  return "due tomorrow"
    if next_exc_day - 1 == current_weekday or (
        next_exc_day == Days.MONDAY.weekday and current_weekday == Days.SUNDAY.weekday
    ):
        return f"Due tomorrow at {start_time}"

    # if here, then the scuedle is due some other day this week, return "due at..."
    weekdays = dict(map(lambda d: (d.weekday, d), Days))
    return f"Due next {weekdays[next_exc_day].value} at {start_time}"

time_to_hexadecimal_timestamp(time_value) ⚓︎

Convert hours and minutes to a timestamp with the current date and encode.

Parameters:

Name Type Description Default
time_value str

time to convert. e.g. "21:00".

required
Return

Hexadecimal representation of the timestamp.

Source code in src/aioswitcher/schedule/tools.py
def time_to_hexadecimal_timestamp(time_value: str) -> str:
    """Convert hours and minutes to a timestamp with the current date and encode.

    Args:
        time_value: time to convert. e.g. "21:00".

    Return:
        Hexadecimal representation of the timestamp.

    """
    tsplit = time_value.split(":")
    str_timedate = time.strftime("%d/%m/%Y") + " " + tsplit[0] + ":" + tsplit[1]
    struct_timedate = time.strptime(str_timedate, "%d/%m/%Y %H:%M")
    timestamp = time.mktime(struct_timedate)
    binary_timestamp = pack("<I", int(timestamp))

    return hexlify(binary_timestamp).decode()

weekdays_to_hexadecimal(days) ⚓︎

Sum the requested weekdays bit representation and return as hexadecimal value.

Parameters:

Name Type Description Default
days Union[Days, Set[Days]]

the requested Weekday members.

required
Return

Hexadecimale representation of the sum of all requested days.

Source code in src/aioswitcher/schedule/tools.py
def weekdays_to_hexadecimal(days: Union[Days, Set[Days]]) -> str:
    """Sum the requested weekdays bit representation and return as hexadecimal value.

    Args:
        days: the requested Weekday members.

    Return:
        Hexadecimale representation of the sum of all requested days.

    """
    if days:
        if type(days) is Days:
            return "{:02x}".format(days.bit_rep)
        elif type(days) is set or len(days) == len(set(days)):  # type: ignore
            map_to_bits = map(lambda w: w.bit_rep, days)  # type: ignore
            return "{:02x}".format(int(sum(map_to_bits)))
    raise ValueError("no days requested")