厨房设置MQ-2烟雾传感器检测天然气等~002~设置wifi联网,数据读取以及数据上报

NO.1
连接wifi

在boot文件写入代码

参考官网文档

https://docs.micropython.org/en/latest/esp8266/quickref.html#networking 

essid为wifi名称

password为wifi密码

def do_connect():
    import network
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    if not wlan.isconnected():
        print('connecting to network...')
        wlan.connect('essid', 'password')
        while not wlan.isconnected():
            pass
    print('network config:', wlan.ifconfig())

boot.py文件和main.py文件一样,会在ESP8266通电启动时自动执行

 

NO.2
读取数据

micropython读取MQ-2烟雾传感器数据

github参考地址

https://github.com/brainiac/MQ2Py 

本文示例代码

 

# include "BaseMQ.h"
## Ported from https://github.com/amperka/TroykaMQ
## Author: Alexey Tveritinov [kartun@yandex.ru]
from machine import Pin, ADC
from micropython import const
import utime
from math import exp, log
import urequests
class BaseMQ(object):
    ## Measuring attempts in cycle
    MQ_SAMPLE_TIMES = const(5)
    ## Delay after each measurement, in ms
    MQ_SAMPLE_INTERVAL = const(5000)
    ## Heating period, in ms
    MQ_HEATING_PERIOD = const(60000)
    ## Cooling period, in ms
    MQ_COOLING_PERIOD = const(90000)
    ## This strategy measure values immideatly, so it might be inaccurate. Should be
    #  suitable for tracking dynamics, raither than actual values
    STRATEGY_FAST = const(1)
    ## This strategy measure values separatelly. For a single measurement
    #    MQ_SAMPLE_TIMES measurements are taken in interval MQ_SAMPLE_INTERVAL.
    #    I.e. for multi-data sensors, like MQ2 it would take a while to receive full data
    STRATEGY_ACCURATE = const(2)
    ## Initialization.
    #  @param pinData Data pin. Should be ADC pin
    #  @param pinHeater Pass -1 if heater connected to main power supply. Otherwise pass another pin capable of PWM
    #  @param boardResistance On troyka modules there is 10K resistor, on other boards could be other values
    #  @param baseVoltage Optionally board could run on 3.3 Volds, base voltage is 5.0 Volts. Passing incorrect values
    #  would cause incorrect measurements
    #  @param measuringStrategy Currently two main strategies are implemented:
    #  - STRATEGY_FAST = 1 In this case data would be taken immideatly. Could be unreliable
    #  - STRATEGY_ACCURATE = 2 In this case data would be taken MQ_SAMPLE_TIMES times with MQ_SAMPLE_INTERVAL delay
    #  For sensor with different gases it would take a while
    def __init__(
        self,
        pinData,
        pinHeater=-1,
        boardResistance=10,
        baseVoltage=5.0,
        measuringStrategy=STRATEGY_ACCURATE,
    ):
        print(888)
        ## Heater is enabled
        self._heater = False
        ## Heater is enabled
        self._cooler = False
        ## Base resistance of module
        self._ro = -1
        self._useSeparateHeater = False
        self._baseVoltage = baseVoltage
        ## @var _lastMeasurement - when last measurement was taken
        self._lastMesurement = utime.ticks_ms()
        self._rsCache = None
        self.dataIsReliable = False
        self.pinData = ADC(pinData)
        self.measuringStrategy = measuringStrategy
        self._boardResistance = boardResistance
        if pinHeater != -1:
            self.useSeparateHeater = True
            self.pinHeater = Pin(pinHeater, Pin.OUTPUT)
            pass
    ## Abstract method, should be implemented in specific sensor driver.
    #  Base RO differs for every sensor family
    def getRoInCleanAir(self):
        raise NotImplementedError("Please Implement this method")
    ## Sensor calibration
    #  @param ro For first time sensor calibration do not pass RO. It could be saved for
    #  later reference, to bypass calibration. For sensor calibration with known resistance supply value
    #  received from pervious runs After calibration is completed @see _ro attribute could be stored for
    #  speeding up calibration
    def calibrate(self, ro=-1):
        if ro == -1:
            ro = 0
            print("Calibrating:")
            for i in range(0, MQ_SAMPLE_TIMES + 1):
                print("Step {0}".format(i))
                ro += self.__calculateResistance__(self.pinData.read())
                utime.sleep_ms(MQ_SAMPLE_INTERVAL)
                pass
            ro = ro / (self.getRoInCleanAir() * MQ_SAMPLE_TIMES)
            pass
        self._ro = ro
        self._stateCalibrate = True
        pass
    ## Enable heater. Is not applicable for 3-wire setup
    def heaterPwrHigh(self):
        # digitalWrite(_pinHeater, HIGH)
        # _pinHeater(1)
        if self._useSeparateHeater:
            self._pinHeater.on()
            pass
        self._heater = True
        self._prMillis = utime.ticks_ms()
    ## Move heater to energy saving mode. Is not applicable for 3-wire setup
    def heaterPwrLow(self):
        # analogWrite(_pinHeater, 75)
        self._heater = True
        self._cooler = True
        self._prMillis = utime.ticks_ms()
    ## Turn off heater. Is not applicable for 3-wire setup
    def heaterPwrOff(self):
        if self._useSeparateHeater:
            self._pinHeater.off()
            pass
        # digitalWrite(_pinHeater, LOW)
        _pinHeater(0)
        self._heater = False
    ## Measure sensor current resistance value, ere actual measurement is performed
    def __calculateResistance__(self, rawAdc):
        if rawAdc == 0:
            rawAdc = 1
        vrl = rawAdc * (self._baseVoltage / 1023)
        rsAir = (self._baseVoltage - vrl) / vrl * self._boardResistance
        return rsAir
    ## Data reading
    # If data is taken frequently, data reading could be unreliable. Check @see dataIsReliable flag
    # Also refer to measuring strategy
    def __readRs__(self):
        if self.measuringStrategy == STRATEGY_ACCURATE:
            rs = 0
            for i in range(0, MQ_SAMPLE_TIMES + 1):
                rs += self.__calculateResistance__(self.pinData.read())
                utime.sleep_ms(MQ_SAMPLE_INTERVAL)
            rs = rs / MQ_SAMPLE_TIMES
            self._rsCache = rs
            self.dataIsReliable = True
            self._lastMesurement = utime.ticks_ms()
            pass
        else:
            rs = self.__calculateResistance__(self.pinData.read())
            self.dataIsReliable = False
            pass
        return rs
    def readScaled(self, a, b):
        return exp((log(self.readRatio()) - b) / a)
    def readRatio(self):
        return self.__readRs__() / self._ro
    ## Checks if sensor heating is completed. Is not applicable for 3-wire setup
    def heatingCompleted(self):
        if (
            (self._heater)
            and (not self._cooler)
            and (utime.ticks_diff(utime.ticks_ms(), self._prMillis) > MQ_HEATING_PERIOD)
        ):
            return True
        else:
            return False
    ## Checks if sensor cooling is completed. Is not applicable for 3-wire setup
    def coolanceCompleted(self):
        if (
            (self._heater)
            and (self._cooler)
            and (utime.ticks_diff(utime.ticks_ms(), self._prMillis) > MQ_COOLING_PERIOD)
        ):
            return True
        else:
            return False
    ## Starts sensor heating. @see heatingCompleted if heating is completed
    def cycleHeat(self):
        self._heater = False
        self._cooler = False
        self.heaterPwrHigh()
        # ifdef MQDEBUG
        print("Heated sensor")
        # endif #MQDEBUG
        pass
    ## Use this to automatically bounce heating and cooling states
    def atHeatCycleEnd(self):
        if self.heatingCompleted():
            self.heaterPwrLow()
            # ifdef MQDEBUG
            print("Cool sensor")
            # endif #MQDEBUG
            return False
        elif self.coolanceCompleted():
            self.heaterPwrOff()
            return True
        else:
            return False
class MQ2(BaseMQ):
    ## Clean air coefficient
    MQ2_RO_BASE = float(9.83)
    def __init__(
        self,
        pinData,
        pinHeater=-1,
        boardResistance=10,
        baseVoltage=5.0,
        measuringStrategy=BaseMQ.STRATEGY_ACCURATE,
    ):
        # Call superclass to fill attributes
        super().__init__(
            pinData, pinHeater, boardResistance, baseVoltage, measuringStrategy
        )
        pass
    ## Measure liquefied hydrocarbon gas, LPG
    def readLPG(self):
        return self.readScaled(-0.45, 2.95)
    ## Measure methane
    def readMethane(self):
        return self.readScaled(-0.38, 3.21)
    ## Measure smoke
    def readSmoke(self):
        return self.readScaled(-0.42, 3.54)
    ## Measure hydrogen
    def readHydrogen(self):
        return self.readScaled(-0.48, 3.32)
    ##  Base RO differs for every sensor family
    def getRoInCleanAir(self):
        return self.MQ2_RO_BASE
print(333)
class App:
    def __init__(self, pin=0):
        self.sensor = MQ2(pinData=pin, baseVoltage=3.3)
    def Run(self):
        print("Calibrating")
        self.sensor.calibrate()
        print("Calibration completed")
        print("Base resistance:{0}".format(self.sensor._ro))
        while True:
            # 烟雾浓度检测
            smoke_num = self.sensor.readSmoke()
            print("烟雾 Smoke: {0}".format(smoke_num))
            # 液化石油气(主要是罐装)
            lpg_num = self.sensor.readLPG()
            print("液化石油气 LPG: {0}".format(lpg_num))
            # 甲烷检测(天然气的主要成分)
            methane_num = self.sensor.readMethane()
            print("甲烷 Methane: {0}".format(methane_num))
            # 氢气检测 可燃气体检测
            hydrogen_num = self.sensor.readHydrogen()
            print("氢气 Hydrogen: {0}".format(hydrogen_num))
            # 当前传感器设备名称
            device_name="gas_001"
            try:
                # 正常的操作
                # 请求的数据 
                query_data = "smoke={0}&lpg={1}&methane={2}&hydrogen={3}&device_name={4}".format(
                    smoke_num, lpg_num, methane_num, hydrogen_num,device_name
                )
                # 发送并记录数据
                url_str = "http://192.168.1.237:9016/gas/addGas?" + query_data
                # 发送http
                urequests.get(url_str)
            except:
                # 发生异常,执行这块代码
                print("http 请求异常")
            else:
                # 如果没有异常执行这块代码
                print("没有异常")
            utime.sleep(5)
App().Run()
NO.3
数据上报

API地址

http://192.168.1.237:9016/gas/addGas 

使用树莓派部署Flask服务+Mysql数据库

路由器使用openWrt统一内网发送传感器数据

由树莓派设置ipv4固定地址,使用frp提供远程服务访问

查看数据库上报结果

 

NO.4
TIps

本文代码的值并不准确,需要根据自己的传感器进行测试

本文目前只记录数据上报到数据库

后续会加入webhook实时报警通知以及web可视化图表数据查看

END