Source code for server_fan

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Central IoT server, fan manageer, and MQTT coordinator.

Script provides following functionalities:

- Script manages a fan attached to a GPIO pin for cooling the system
  on the basis of the system temperature provided by the SoC.
- Script acts as a MQTT coordinator utilizing local MQTT broker ``mosquitto``
  for data exchange within the script as well as with outside environment.
- Script communicates with cloud services ``ThingSpeak`` and ``Blynk``.
- Script publishes system temperature and fan status (running, idle) to all
  three platforms: ``local MQTT broker``, ``ThingSpeak cloud``, ``Blynk mobile
  application``.
- Script can receive commands from `local MQTT broker` and from
  `Blynk mobile app` in order to change its behaviour during runnig, e.g.,
  turn on or off the fan, change fan trigger temperatures, etc.

"""
__version__ = "0.5.0"
__status__ = "Beta"
__author__ = "Libor Gabaj"
__copyright__ = "Copyright 2018, " + __author__
__credits__ = [__author__]
__license__ = "MIT"
__maintainer__ = __author__
__email__ = "libor.gabaj@gmail.com"

# Standard library modules
import time
import os
import os.path
import sys
import argparse
import logging
# Third party modules
import gbj_pythonlib_sw.config as modConfig
import gbj_pythonlib_sw.mqtt as modMQTT
import gbj_pythonlib_sw.statfilter as modFilter
import gbj_pythonlib_sw.timer as modTimer
import gbj_pythonlib_sw.trigger as modTrigger
import gbj_pythonlib_hw.orangepi as modOrangePi
import BlynkLib as modBlynk


###############################################################################
# Script constants - General states and MQTT commands
###############################################################################
ON = "ON"
OFF = "OFF"
TOGGLE = "TOGGLE"
RESET = "RESET"


###############################################################################
# Script constants - Fan MQTT commands and maps
###############################################################################
CMD_FAN_ON = ON
CMD_FAN_OFF = OFF
CMD_FAN_TOGGLE = TOGGLE
CMD_FAN_PERCON = "PERCON"  # Percentage of maximal temperature for fan on
CMD_FAN_PERCOFF = "PERCOFF"  # Percentage of maximal temperature for fan off


###############################################################################
# Script constants - ThingSpeak statuses
###############################################################################
STATUS_FAN_ON = "FAN-ON"
STATUS_FAN_OFF = "FAN-OFF"


###############################################################################
# Script global variables
###############################################################################
script_run = True  # Flag about running script in a loop
cmdline = None  # Object with command line arguments
logger = None  # Object with standard logging
trigger = None  # Object with triggers
filter = None  # Object with statistical smoothing and filtering
config = None  # Object with MQTT configuration file processing
mqtt = None  # Object for MQTT broker manipulation
thingspeak = None  # Object for ThingSpeak MQTT manipulation
pi = None  # Object with OrangePi GPIO control
blynk = None  # Object for Blynk application cooperation


###############################################################################
# Helper functions
###############################################################################


###############################################################################
# General actions
###############################################################################
[docs]def action_fan(command, value=None): """Perform command for the fan. Arguments --------- command : str Action name to be realized. value Any value that the action should be realized with. """ # Controlling fan if command in [CMD_FAN_ON, CMD_FAN_OFF, CMD_FAN_TOGGLE]: # Suppress publishing useless command, i.e., the command changes pin # state that it already has. try: if command == CMD_FAN_TOGGLE: if pi.is_pin_on(pi.PIN_FAN): command = CMD_FAN_OFF else: command = CMD_FAN_ON if command == CMD_FAN_ON: if pi.is_pin_on(pi.PIN_FAN): return pi.pin_on(pi.PIN_FAN) elif command == CMD_FAN_OFF: if pi.is_pin_off(pi.PIN_FAN): return pi.pin_off(pi.PIN_FAN) else: return logger.info("Fan set to %s", command) except Exception as errmsg: logger.error("Fan command %s failed: %s.", command, errmsg) # Publishing action mqtt_publish_fan_status() thingspeak_publish(fan_status=True) blynk_publish_fan_status() # Updating fan temperature percentage ON if command == CMD_FAN_PERCON: try: value = abs(float(value)) setup_trigger_fan(fan_perc_on=value) logger.info("Updated fan percentage ON=%s%%", value) mqtt_publish_fan_percon() blynk_publish_fan_percon() except Exception: logger.error("Fan command %s failed", command) # Updating fan temperature percentage ON if command == CMD_FAN_PERCOFF: try: value = abs(float(value)) setup_trigger_fan(fan_perc_off=value) logger.info("Updated fan percentage OFF=%s%%", value) mqtt_publish_fan_percoff() blynk_publish_fan_percoff() except Exception: logger.error("Fan command %s failed", command) # Updating fan temperature percentages if command == RESET: setup_trigger_fan( fan_perc_on=pi.FAN_PERC_ON_DEF, fan_perc_off=pi.FAN_PERC_OFF_DEF, ) logger.info("Reset fan limits") mqtt_publish_fan_limits() blynk_publish_fan_limits()
[docs]def action_script(command): """Perform command for this script itself. Arguments --------- command : str Received command to be realized: ``{"EXIT"}``. """ # Stop script if command == "EXIT": global script_run script_run = False
############################################################################### # MQTT actions ###############################################################################
[docs]def mqtt_publish_temp(): """Publish SoC temperature to a MQTT topic.""" if not mqtt.get_connected(): return message = filter.result() option = "server_data_temp" section = mqtt.GROUP_TOPICS try: mqtt.publish(message, option, section) logger.debug( "Published temperature %s°C to MQTT topic %s.", filter.result(), mqtt.topic_name(option, section)) except Exception as errmsg: logger.error( "Temperature publishing to MQTT topic option %s:[%s] failed: %s.", option, section, errmsg)
[docs]def mqtt_publish_fan_status(): """Publish fan status to the MQTT status topic.""" if not mqtt.get_connected(): return cfg_option = "server_status_fan" cfg_section = mqtt.GROUP_TOPICS if pi.is_pin_on(pi.PIN_FAN): message = STATUS_FAN_ON else: message = STATUS_FAN_OFF try: mqtt.publish(message, cfg_option, cfg_section) logger.debug( "Published fan status %s to MQTT topic %s.", message, mqtt.topic_name(cfg_option, cfg_section), ) except Exception as errmsg: logger.error( "Publishing fan status %s to MQTT topic %s failed: %s.", message, mqtt.topic_name(cfg_option, cfg_section), errmsg, )
[docs]def mqtt_publish_fan_percon(): """Publish fan temperature percentage ON to the MQTT status topic.""" if not mqtt.get_connected(): return cfg_option = "server_status_fan_percon" cfg_section = mqtt.GROUP_TOPICS try: mqtt.publish(str(pi.FAN_PERC_ON_CUR), cfg_option, cfg_section) logger.debug( "Published fan percentage ON=%s%% to MQTT topic %s.", pi.FAN_PERC_ON_CUR, mqtt.topic_name(cfg_option, cfg_section)) except Exception as errmsg: logger.error( "Publishing fan percentage ON=%s%% to MQTT topic %s failed: %s.", pi.FAN_PERC_ON_CUR, mqtt.topic_name(cfg_option, cfg_section), errmsg)
[docs]def mqtt_publish_fan_percoff(): """Publish fan temperature percentage OFF to the MQTT status topic.""" if not mqtt.get_connected(): return cfg_option = "server_status_fan_percoff" cfg_section = mqtt.GROUP_TOPICS try: mqtt.publish(str(pi.FAN_PERC_OFF_CUR), cfg_option, cfg_section) logger.debug( "Published fan percentage OFF=%s%% to MQTT topic %s.", pi.FAN_PERC_OFF_CUR, mqtt.topic_name(cfg_option, cfg_section)) except Exception as errmsg: logger.error( "Publishing fan percentage OFF=%s%% to MQTT topic %s failed: %s.", pi.FAN_PERC_OFF_CUR, mqtt.topic_name(cfg_option, cfg_section), errmsg)
[docs]def mqtt_publish_fan_limits(): """Publish fan temperature percentages to the MQTT status topic.""" mqtt_publish_fan_percon() mqtt_publish_fan_percoff()
[docs]def mqtt_message_log(message): """Log receiving from a MQTT topic. Arguments --------- message : str An instance of ``MQTTMessage``. This is a class with members `topic`, `payload`, `qos`, `retain`. Returns ------- bool Flag about present message payload. See Also -------- gbj_pythonlib_sw.mqtt Module for MQTT processing. """ logger.debug( "Message from MQTT topic %s with qos %s and retain %s", message.topic, message.qos, message.retain) if message.payload is None: return False logger.debug("%s: %s", sys._getframe(1).f_code.co_name, message.payload.decode("utf-8")) return True
[docs]def thingspeak_publish(fan_status=False): """Publish to ThingSpeak. Arguments --------- fan_status : bool Flag determining whether current fan state should be published as a ThingSpeak channel status. Notes ----- Data fields are published automatically. """ fields = {thingspeak.FIELD_TEMP: filter.result()} # Changed fan state since recent publishing fan_state_cur = pi.pin_state(pi.PIN_FAN) if not hasattr(thingspeak, "fan_state_old"): thingspeak.fan_state_old = fan_state_cur if fan_state_cur != thingspeak.fan_state_old: fields[thingspeak.FIELD_FAN] = fan_state_cur logger.debug( "Fan state change %s -> %s for ThingSpeak field%s", thingspeak.fan_state_old, fan_state_cur, thingspeak.FIELD_FAN) thingspeak.fan_state_old = fan_state_cur # Fan status status = None if fan_status: if pi.is_pin_on(pi.PIN_FAN): status = STATUS_FAN_ON else: status = STATUS_FAN_OFF status += ": {}°C {}".format( fields[thingspeak.FIELD_TEMP], time.ctime() ) # Publication to ThingSpeak try: logger.debug("Publish to ThingSpeak") if thingspeak.publish(fields=fields, status=status): logger.debug( "Published temperature %s°C to ThingSpeak field%s", fields[thingspeak.FIELD_TEMP], thingspeak.FIELD_TEMP) if thingspeak.FIELD_FAN in fields: logger.debug( "Published fan state %s to ThingSpeak field%s", fields[thingspeak.FIELD_FAN], thingspeak.FIELD_FAN) if status is not None and len(status) > 0: logger.debug( "Published channel status %s to ThingSpeak", status) except Exception as errmsg: logger.error( "Publishing to ThingSpeak failed: %s", errmsg)
[docs]def blynk_publish_fan_status(): """Publish fan status to Blynk mobile application.""" global blynk if blynk is None: return if pi.is_pin_on(pi.PIN_FAN): fan_status = ON led_value = 255 else: fan_status = OFF led_value = 0 try: blynk.virtual_write(blynk.VPIN_FAN_LED, led_value) logger.debug("Published fan status %s to Blynk.", fan_status) except Exception: logger.error("Publishing fan status to Blynk failed.")
[docs]def blynk_publish_fan_percon(): """Publish fan temperature percentage ON to Blynk mobile application.""" global blynk if blynk is None: return try: blynk.virtual_write(blynk.VPIN_FAN_PERCON, pi.FAN_PERC_ON_CUR) logger.debug("Published fan percentages ON=%s%% to Blynk.", pi.FAN_PERC_ON_CUR) except Exception: logger.error("Publishing fan percentage ON to Blynk failed.")
[docs]def blynk_publish_fan_percoff(): """Publish fan temperature percentage OFF to Blynk mobile application.""" global blynk if blynk is None: return try: blynk.virtual_write(blynk.VPIN_FAN_PERCOFF, pi.FAN_PERC_OFF_CUR) logger.debug("Published fan percentages OFF=%s%% to Blynk.", pi.FAN_PERC_OFF_CUR) except Exception: logger.error("Publishing fan percentage OFF to Blynk failed.")
[docs]def blynk_publish_fan_limits(): """Publish fan temperature percentages to Blynk mobile application.""" blynk_publish_fan_percon() blynk_publish_fan_percoff()
############################################################################### # Callback functions ###############################################################################
[docs]def cbTimer_temp_measure(*arg, **kwargs): """Measure current CPU temperature.""" # blynk_publish() exec_last = kwargs.pop("exec_last", False) logger.debug( "Measured temperature %s°C", filter.result(pi.measure_temperature()) ) if exec_last: # global script_run # script_run = False pass
[docs]def cbTimer_temp_publish(*arg, **kwargs): """Publish current CPU temperature.""" logger.debug( "Publish temperature %s°C", filter.result() ) mqtt_publish_temp()
[docs]def cbTimer_temp_triggers(*arg, **kwargs): """Execute CPU temperature triggers.""" trigger.exec_triggers(filter.result(), ids=["fanon", "fanoff"])
[docs]def cbTimer_thingspeak(*arg, **kwargs): """Publish to ThingSpeak.""" thingspeak_publish()
[docs]def cbTrigger_fan(*args, **kwargs): """Execute command for the fan.""" command = kwargs.pop("cmd", None) if command is None: return action_fan(command)
[docs]def cbMqtt_on_connect(client, userdata, flags, rc): """Process actions when the broker responds to a connection request. Arguments --------- client : object MQTT client instance for this callback. userdata The private user data. flags : dict Response flags sent by the MQTT broker. rc : int The connection result (result code). See Also -------- gbj_pythonlib_sw.mqtt._on_connect() Description of callback arguments for proper utilizing. """ if rc == 0: logger.debug("Connected to %s: %s", str(mqtt), userdata) setup_mqtt_filters() mqtt_publish_fan_status() mqtt_publish_fan_limits() else: logger.error("Connection to MQTT broker failed: %s", userdata)
[docs]def cbMqtt_on_disconnect(client, userdata, rc): """Process actions when the client disconnects from the broker. Arguments --------- client : object MQTT client instance for this callback. userdata The private user data. rc : int The connection result (result code). See Also -------- gbj_pythonlib_sw.mqtt._on_connect() Description of callback arguments for proper utilizing. """ logger.warning("Disconnected from %s: %s", str(mqtt), userdata)
[docs]def cbMqtt_on_subscribe(client, userdata, mid, granted_qos): """Process actions when the broker responds to a subscribe request. Arguments --------- client : object MQTT client instance for this callback. userdata The private user data. mid : int The message ID from the subscribe request. granted_qos : int The list of integers that give the QoS level the broker has granted for each of the different subscription requests. """ # logger.debug("Subscribed to MQTT topic with message id %d", mid) pass
[docs]def cbMqtt_on_message(client, userdata, message): """Process actions when a non-filtered message has been received. Arguments --------- client : object MQTT client instance for this callback. userdata The private user data. message : object An instance of ``MQTTMessage``. This is a class with members `topic`, `payload`, `qos`, `retain`. Notes ----- - The topic that the client subscribes to and the message does not match an existing topic filter callback. - Use message_callback_add() to define a callback that will be called for specific topic filters. This function serves as fallback when none topic filter matched. """ if not mqtt_message_log(message): return
[docs]def cbMqtt_on_message_data(client, userdata, message): """Process server data send through a MQTT topic(s). Arguments --------- client : object MQTT client instance for this callback. userdata The private user data. message : object An instance of ``MQTTMessage``. This is a class with members `topic`, `payload`, `qos`, `retain`. """ if not mqtt_message_log(message): return # CPU Temperature if message.topic == mqtt.topic_name("server_data_temp"): value = float(message.payload) logger.debug("Received temperature %s°C", value) # Unexpected data else: logger.warning( "Received unknown data %s from topic %s", message.payload.decode("utf-8"), message.topic)
[docs]def cbMqtt_on_message_command(client, userdata, message): """Process server command at receiving a message from the command topic(s). Arguments --------- client : object MQTT client instance for this callback. userdata The private user data. message : object An instance of ``MQTTMessage``. This is a class with members `topic`, `payload`, `qos`, `retain`. Notes ----- - The topic that the client subscribes to and the message match the topic filter for server commands. """ if not mqtt_message_log(message): return # Command command = message.payload.decode("utf-8") if message.topic == mqtt.topic_name("server_command"): logger.debug( "Received general command %s from topic %s", command, message.topic) action_script(command) # Test data elif message.topic == mqtt.topic_name("server_command_test"): logger.debug( "Received test command %s from topic %s", command, message.topic) # Fan control elif message.topic == mqtt.topic_name("server_command_fan"): logger.debug( "Received fan command %s from topic %s", command, message.topic) action_fan(command) elif message.topic in [mqtt.topic_name("server_command_fan_percon"), mqtt.topic_name("server_command_fan_percoff"), ]: command = message.topic.split("/").pop().upper() logger.debug( "Received fan command %s with value %s from topic %s", command, message.payload.decode("utf-8"), message.topic) action_fan(command, message.payload.decode("utf-8")) # Unexpected data else: logger.warning( "Received unknown command %s from topic %s", message.payload.decode("utf-8"), message.topic)
[docs]def cbBlynk_on_connect(): """Process actions when the script is connected to Blynk cloud.""" # Update mobile application blynk_publish_fan_status() blynk_publish_fan_limits() logger.debug("Blynk mobile application synchronized")
############################################################################### # Setup functions ###############################################################################
[docs]def setup_cmdline(): """Define command line arguments.""" config_file = os.path.splitext(os.path.abspath(__file__))[0] + ".ini" log_folder = "/var/log" parser = argparse.ArgumentParser( description="Tester, version " + __version__ ) # Position arguments parser.add_argument( "config", type=argparse.FileType("r"), nargs="?", default=config_file, help="Configuration INI file, default: " + config_file ) # Options parser.add_argument( "-V", "--version", action="version", version="%(prog)s " + __version__, help="Current version of the script." ) parser.add_argument( "-v", "--verbose", choices=["debug", "warning", "info", "error", "critical"], default="warning", help="Level of logging to console." ) parser.add_argument( "-l", "--loglevel", choices=["debug", "warning", "info", "error", "critical"], default="debug", help="Level of logging to log file." ) parser.add_argument( "-d", "--logdir", default=log_folder, help="Folder of a log file, default " + log_folder ) parser.add_argument( "-c", "--configuration", action="store_true", help="""Print configuration parameters in form of INI file content.""" ) # Process command line arguments global cmdline cmdline = parser.parse_args()
[docs]def setup_logger(): """Configure logging facility.""" global logger # Set logging to file for module and script logging log_file = "/".join([cmdline.logdir, os.path.basename(__file__) + ".log"]) logging.basicConfig( level=getattr(logging, cmdline.loglevel.upper()), format="%(asctime)s - %(levelname)-8s - %(name)-20s: %(message)s", filename=log_file, filemode="w" ) # Set console logging formatter = logging.Formatter( "%(levelname)-8s - %(name)-20s: %(message)s") console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(getattr(logging, cmdline.verbose.upper())) console_handler.setFormatter(formatter) logger = logging.getLogger("{} {}".format( os.path.basename(__file__), __version__)) logger.addHandler(console_handler) logger.warning("Script started from file %s", os.path.abspath(__file__))
[docs]def setup_config(): """Define configuration file management.""" global config config = modConfig.Config(cmdline.config) # Construct configuration file content if cmdline.configuration: config.get_content()
[docs]def setup_pi(): """Define GPIO control. Notes ----- - Operational pin names are stored in the object as attributes. - Default fan percentage limits are stored in the object as attributes. """ global pi pi = modOrangePi.OrangePiOne() pi.PIN_FAN = config.option("pin_fan_name", "Fan") # pi.PIN_LED = config.option("pin_led_name", "Fan") # Temperature percentage for fan ON pi.FAN_PERC_ON_DEF = abs(float(config.option( "percentage_maxtemp_on", "Fan", 85.0))) pi.FAN_PERC_ON_MIN = 80.0 pi.FAN_PERC_ON_MAX = 95.0 pi.FAN_PERC_ON_CUR = pi.FAN_PERC_ON_DEF # Temperature percentage for fan OFF pi.FAN_PERC_OFF_DEF = abs(float(config.option( "percentage_maxtemp_off", "Fan", 75.0))) pi.FAN_PERC_OFF_MIN = 60.0 pi.FAN_PERC_OFF_MAX = 75.0 pi.FAN_PERC_OFF_CUR = pi.FAN_PERC_OFF_DEF
[docs]def setup_mqtt(): """Define MQTT management.""" global mqtt mqtt = modMQTT.MqttBroker(config) mqtt.connect( username=config.option("username", mqtt.GROUP_BROKER), password=config.option("password", mqtt.GROUP_BROKER), connect=cbMqtt_on_connect, disconnect=cbMqtt_on_disconnect, subscribe=cbMqtt_on_subscribe, message=cbMqtt_on_message, )
[docs]def setup_mqtt_filters(): """Define MQTT topic filters and subscribe to them. Notes ----- - The function is called in on_connect callback function after successful connection to a MQTT broker. """ mqtt.callback_filters( server_filter_data=cbMqtt_on_message_data, server_filter_command=cbMqtt_on_message_command, ) try: mqtt.subscribe_filters() except Exception as errcode: logger.error( "MQTT subscribtion to topic filters failed with error code %s", errcode)
[docs]def setup_thingspeak(): """Define ThingSpeak management.""" global thingspeak thingspeak = modMQTT.ThingSpeak(config) thingspeak.FIELD_TEMP = int(config.option("field_temp", thingspeak.GROUP_BROKER, 1)) thingspeak.FIELD_FAN = int(config.option("field_fan", thingspeak.GROUP_BROKER, 2))
[docs]def setup_filter(): """Define statistical smoothing and filtering.""" global filter filter = modFilter.StatFilterExponential( decimals=3, factor=0.2 )
# filter = gbj_statfilter.StatFilterRunning( # decimals=3, # stat_type=gbj_statfilter.MEDIAN, # )
[docs]def setup_trigger(): """Define triggers for evaluating value limits.""" global trigger trigger = modTrigger.Trigger() setup_trigger_fan()
[docs]def setup_trigger_fan(fan_perc_on=None, fan_perc_off=None): """Define triggers for controlling fan by SoC temperature. Arguments --------- fan_perc_on : float Percentage of maximal temperature for turning fan on. fan_perc_off : float Percentage of maximal temperature for turning fan off. """ # Sanitize parameters pi.FAN_PERC_ON_CUR = max(min(float(fan_perc_on or pi.FAN_PERC_ON_CUR), pi.FAN_PERC_ON_MAX), pi.FAN_PERC_ON_MIN) pi.FAN_PERC_OFF_CUR = max(min(float(fan_perc_off or pi.FAN_PERC_OFF_CUR), pi.FAN_PERC_OFF_MAX), pi.FAN_PERC_OFF_MIN) if pi.FAN_PERC_OFF_CUR > pi.FAN_PERC_ON_CUR: pi.FAN_PERC_OFF_CUR, pi.FAN_PERC_ON_CUR \ = pi.FAN_PERC_ON_CUR, pi.FAN_PERC_OFF_CUR # Set triggers logger.debug( "Setup fan triggers: %s = %s%%, %s = %s%%", ON, pi.FAN_PERC_ON_CUR, OFF, pi.FAN_PERC_OFF_CUR) trigger.set_trigger( id="fanon", mode=modTrigger.UPPER, value=pi.convert_percentage_temperature(pi.FAN_PERC_ON_CUR), callback=cbTrigger_fan, cmd=CMD_FAN_ON, # Arguments to callback ) trigger.set_trigger( id="fanoff", mode=modTrigger.LOWER, value=pi.convert_percentage_temperature(pi.FAN_PERC_OFF_CUR), callback=cbTrigger_fan, cmd=CMD_FAN_OFF, # Arguments to callback )
[docs]def setup_timers(): """Define dictionary of timers.""" # Timer 01 name = "Timer_temp" cfg_section = "TimerTemperature" # Measurement period c_period = float(config.option("period_measure", cfg_section, 5.0)) c_period = max(min(c_period, 60.0), 1.0) # Publishing prescale c_publish = int(config.option("prescale_publish", cfg_section, 3)) c_publish = max(min(c_publish, 10), 1) # Trigger evaluation prescale c_triggers = int(config.option("prescale_triggers", cfg_section, 6)) c_triggers = max(min(c_triggers, 1000), 1) logger.debug( "Setup timer %s: period = %ss, publish = %sx, triggers = %sx", name, c_period, c_publish, c_triggers) # Definition timer1 = modTimer.Timer( c_period, cbTimer_temp_measure, name=name, # count=9, ) timer1.prescaler(c_publish, cbTimer_temp_publish) timer1.prescaler(c_triggers, cbTimer_temp_triggers) modTimer.register_timer(name, timer1) # Timer 02 name = "Timer_thingspeak" cfg_section = thingspeak.GROUP_BROKER # Measurement period c_period = float(config.option("period_publish", cfg_section, 60.0)) c_period = max(c_period, thingspeak.get_publish_delay()) logger.debug( "Setup timer %s: period = %ss", name, c_period) # Definition timer2 = modTimer.Timer( c_period, cbTimer_thingspeak, name=name, # count=9, ) modTimer.register_timer(name, timer2) # Start all timers modTimer.start_timers()
[docs]def setup_blynk(): """Define Blynk parameters.""" global blynk config_group = "Blynk" blynk = modBlynk.Blynk(config.option("blynk_auth", config_group)) blynk.on_connect(cbBlynk_on_connect) # Store Blynk colors blynk.COLOR_GREEN = "#23C48E" blynk.COLOR_BLUE = "#04C0F8" blynk.COLOR_YELLOW = "#ED9D00" blynk.COLOR_RED = "#D3435C" blynk.COLORDARK_BLUE = "#5F7CD8" # Store virtual pins blynk.VPIN_TEMP = abs(int(config.option("vpin_temp", config_group))) blynk.VPIN_FAN_LED = abs(int(config.option("vpin_fan_led", config_group))) blynk.VPIN_FAN_BTN = abs(int(config.option("vpin_fan_btn", config_group))) blynk.VPIN_FAN_PERCON = abs(int(config.option("vpin_fan_percon", config_group))) blynk.VPIN_FAN_PERCOFF = abs(int(config.option("vpin_fan_percoff", config_group))) @blynk.VIRTUAL_WRITE(blynk.VPIN_FAN_BTN) def blynk_fan_button(button_state): """Receive command for fan from mobile app. Arguments --------- button_state : str Received value from Blynk button widget. """ # React only on pushing the button and ignore releasing it if int(button_state): logger.debug("Fan button state %s from Blynk virtual pin %s", button_state, blynk.VPIN_FAN_BTN) action_fan(CMD_FAN_TOGGLE) @blynk.VIRTUAL_READ(blynk.VPIN_TEMP) def blynk_read_temperature(): """Send data to mobile app on demand.""" blynk.virtual_write(blynk.VPIN_TEMP, filter.result()) @blynk.VIRTUAL_WRITE(blynk.VPIN_FAN_PERCON) def blynk_fan_percon(value): """Receive temperature percentage for fan ON from mobile app. Arguments --------- value : str Received percentage for fan ON value from Blynk numeric input widget. """ # React only on pushing the button and ignore releasing it logger.debug("Fan ON percentage %s%% from Blynk virtual pin %s", value, blynk.VPIN_FAN_PERCON) action_fan(CMD_FAN_PERCON, value) @blynk.VIRTUAL_WRITE(blynk.VPIN_FAN_PERCOFF) def blynk_fan_percoff(value): """Receive temperature percentage for fan OFF from mobile app. Arguments --------- value : str Received percentage for fan OFF value from Blynk numeric input widget. """ # React only on pushing the button and ignore releasing it logger.debug("Fan OFF percentage %s%% from Blynk virtual pin %s", value, blynk.VPIN_FAN_PERCOFF) action_fan(CMD_FAN_PERCOFF, value)
[docs]def setup(): """Global initialization.""" pass
[docs]def loop(): """Wait for keyboard or system exit.""" try: global blynk if blynk is None: logger.info("Script loop started") while (script_run): time.sleep(1) logger.warning("Script finished") else: logger.info("Script run by BLYNK") blynk.run() except (KeyboardInterrupt, SystemExit): logger.warning("Script cancelled") finally: modTimer.stop_timers()
[docs]def main(): """Fundamental control function.""" setup_cmdline() setup_logger() setup_config() setup_pi() setup_mqtt() setup_thingspeak() setup_filter() setup_trigger() setup_timers() setup_blynk() setup() loop()
if __name__ == "__main__": if os.getegid() != 0: sys.exit('Script must be run as root') main()