我們是通訊行業(yè)動力維護專業(yè),因為外市電(三相交流電)的波動會對我們的設(shè)備造成一定的影響。于是想做一個交流電監(jiān)測的應(yīng)用,監(jiān)測交流電的過壓或欠壓,所以我做了這么一個實時監(jiān)測的工具。它可以在過壓和欠壓一定程度的時候告警?;蛘哌M行長期監(jiān)測并記錄電壓數(shù)據(jù),分析可能發(fā)生過壓或欠壓的原因。
樹莓派作為一個簡單易用功能強大的計算平臺,很好地支持 Python。而且剛好有 MCC 118 數(shù)據(jù)采集模塊(HAT)可以直接擴展出數(shù)據(jù)采集功能,MCC 118 提供 8 通道模擬電壓輸入,基于樹莓派的數(shù)據(jù)采集/數(shù)據(jù)記錄系統(tǒng)。每張 MCC 118 最大采樣率為 100kS/s,可以進行單電壓點和電壓波形采集。
項目的代碼就是在 MCC DAQ HATs 提供的 SDK 示例代碼中,continuous_scan.py 的基礎(chǔ)上修改來的。實現(xiàn)一個高速的采集及告警設(shè)備,來監(jiān)測外市電的波動情況。
長期監(jiān)測主要通過長期數(shù)據(jù)記錄,分析是否有未超出告警范圍的波動,以分析供電質(zhì)量。
這只是一個工程樣機,可以進行實時(延時3秒+)告警和儲存發(fā)生告警時的電壓波形;可以進行長期監(jiān)測(不實時告警)、也可以長期監(jiān)測并實時告警。
內(nèi)部構(gòu)造如下圖,其中 MCC 118 在圖中正上方。
警告分為4種:本機的光警告、本機的聲警告、繼電器輸出的外部警告、物聯(lián)網(wǎng)平臺(OneNet)警告。
目前支持兩路三相交流電監(jiān)測,每路三相電的 A、B、C 相分別用:黃、綠、紅,發(fā)光二極管警告。
物聯(lián)網(wǎng)平臺的警告頁面。分別是:一路三相電的一相警告,以及一路三相電的三相警告。
采集到的正常交流電波形如下圖。
數(shù)據(jù)分析部分是用 NumPy 進行交流電每個周期的最大值、最小值判斷,然后確定是否報警,是否寫入硬盤。
還有一個程序每一做任何判斷,把采集到的所有數(shù)據(jù)都直接記錄到硬盤,大概一小時會生成 1.2G 的數(shù)據(jù)。
也是用 Python 寫的腳本,通過 NumPy 把數(shù)據(jù)做一個轉(zhuǎn)換 ,然后用 NumPy 的 amax 和 amin 對數(shù)值進行分析判斷。如果數(shù)據(jù)超過閾值,就寫硬盤文件,同時用 GPIO 輸出控制 LED 來發(fā)出預警信號。
以下是完整代碼,其中關(guān)鍵代碼已經(jīng)添加了注釋。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
MCC 118 Functions Demonstrated:
mcc118.a_in_scan_start
mcc118.a_in_scan_read
mcc118.a_in_scan_stop
mcc118.a_in_scan_cleanup
Purpose:
Perform a continuous acquisition on 1 or more channels.
Description:
Continuously acquires blocks of analog input data for a
user-specified group of channels until the acquisition is
stopped by the user. The last sample of data for each channel
is displayed for each block of data received from the device.
MCC118共8個通道,可以監(jiān)測兩個三相變壓器的6路市電,分別為變壓器1的A、B、C相和變壓器2的A、B、C相
"""
from __future__ import print_function
from daqhats import mcc118, OptionFlags, HatIDs, HatError
from daqhats_utils import select_hat_device, enum_mask_to_string, \
chan_list_to_mask
import os
import threading
import numpy as np
import datetime
import time
from socket import *
import RPi.GPIO as GPIO
# ===GPIO setup=====================================================================
GPIO.setmode(GPIO.BCM)
led_list=[5,6,19,16,20,21] #LED使用的GPIO
GPIO.setup(led_list, GPIO.OUT, initial=GPIO.LOW)
delta = datetime.timedelta(minutes=5)
# ===================================================================================
READ_ALL_AVAILABLE = -1
CURSOR_BACK_2 = '\x1b[2D's
ERASE_TO_END_OF_LINE = '\x1b[0K'
# ===================================================================================
arraydata=[[0 for i in range(72000)] for h in range(10)] #定義一個采樣緩存數(shù)組,72000為read_request_size*信道數(shù),10為#連續(xù)采樣的次數(shù)c_time
# =======OneNet param===================================================================
device_id = "Your 設(shè)備ID" # Your 設(shè)備ID
api_key = "Your API_KEY" # Your API_KEY
HOST = 'api.heclouds.com'
PORT = 80
BUFSIZ = 1024
ADDR = (HOST, PORT)
# =======Channel set==================================================================
# Store the channels in a list and convert the list to a channel mask that
# can be passed as a parameter to the MCC 118 functions.
channels = [0, 1, 2, 3, 4, 5]
channel_mask = chan_list_to_mask(channels)
num_channels = len(channels)
samples_per_channel = 0
options = OptionFlags.CONTINUOUS
scan_rate = 10000.0 #采樣速率 最大值為100k/信道數(shù)
read_request_size = 12000 #每通道此次采樣的點數(shù)
c_time = 10 #連續(xù)采樣的次數(shù)
timeout = 5.0
# =======上下限=================================================================
volt=220 #根據(jù)需要設(shè)定標準電壓
thread=0.2 #根據(jù)需要設(shè)定
upline=volt*(1+thread)*1.414*0.013558 #0.013558是根據(jù)電阻分壓確定的
downline=volt*(1-thread)*1.414*0.013558 #0.013558是根據(jù)電阻分壓確定的
# =======LED監(jiān)測=================================================================
base_time=datetime.datetime.now()
#定義時間臨時數(shù)組
led_stime=[base_time,base_time,base_time,base_time,base_time,base_time]
#定義狀態(tài)臨時數(shù)組
led_status=[-1,-1,-1,-1,-1,-1]
#=====================================================================================
def led_detect():
global led_stime,led_status
#變壓器的A、B、C相
transno=["Trans1A","Trans1B","Trans1C","Trans2A","Trans2B","Trans2C"]
try:
#判斷每個LED的狀態(tài)
while PORT > 0 :
time.sleep(1)
if GPIO.input(5) == GPIO.HIGH and led_status[0]<0 : ??????
led_stime[0]=datetime.datetime.now()
led_status[0]=1
if GPIO.input(6) == GPIO.HIGH and led_status[1]<0 :
led_stime[1]=datetime.datetime.now()
led_status[1]=1
if GPIO.input(19) == GPIO.HIGH and led_status[2]<0 :
led_stime[2]=datetime.datetime.now()
led_status[2]=1
if GPIO.input(16) == GPIO.HIGH and led_status[3]<0 :
led_stime[3]=datetime.datetime.now()
led_status[3]=1
if GPIO.input(20) == GPIO.HIGH and led_status[4]<0 :
led_stime[4]=datetime.datetime.now()
led_status[4]=1
if GPIO.input(21) == GPIO.HIGH and led_status[5]<0 :
led_stime[5]=datetime.datetime.now()
led_status[5]=1
#相應(yīng)引腳延時5分鐘熄滅,并向平臺發(fā)送數(shù)據(jù)清除告警
for i in range(6):
now=datetime.datetime.now()
if now > ( led_stime[i] + delta ) and led_status[i]>=0 :
GPIO.output(led_list[i], GPIO.LOW)
led_status[i]=-1
t5 = threading.Thread(target=senddata, args=(transno[i],"0"))
t5.start()
except KeyboardInterrupt:
# Clear the '^C' from the display.
print(CURSOR_BACK_2, ERASE_TO_END_OF_LINE, '\n')
print('Stopping')
# ========senddata to OneNet==============================================================
def senddata(T_dev,alm):
Content = ""
Content += "{"datastreams":[{"id": ""+T_dev+"","datapoints": [{"value": " + alm + "}]}"
Content += "]}\r\n"
value = len(Content)
data = ""
data += "POST /devices/" + device_id + "/datapoints HTTP/1.1\r\n"
data += "Host:api.heclouds.com\r\n"
data += "Connection: close\r\n"
data += "api-key:" + api_key + "\r\n"
data += "Content-Length:" + str(value) + "\r\n"
data += "\r\n"
data += Content
tcpCliSock = socket(AF_INET, SOCK_STREAM)
tcpCliSock.connect(ADDR)
tcpCliSock.send(data.encode())
#data1 = tcpCliSock.recv(BUFSIZ).decode()
#print(data1)
# =====writefile==========================================================================
def writefile(arrayatt, fnat):
print("write file ", datetime.datetime.now())
np.savetxt(fnat, arrayatt.T, fmt="%1.2f", delimiter=" ")
print("\033[0;31m","finish write ", datetime.datetime.now(),"\033[0m")
#====Analyse and writefile and Alarm=======================================================
def analyse(array,fnat):
break_1_1 = -1
break_2_1 = -1
break_1_2 = -1
break_2_2 = -1
break_1_3 = -1
break_2_3 = -1
break_1 = -1
break_2 = -1
print("analyse file ", datetime.datetime.now())
#等于read_request_size = 12000
lll = read_request_size
file1 = "1-"+fnat
file2 = "2-"+fnat
a=np.array(array)
b = np.empty([3, c_time*read_request_size], dtype=float) # 變壓器1定義一個數(shù)組
c = np.empty([3, c_time*read_request_size], dtype=float) # 變壓器2定義一個數(shù)組
#板子的數(shù)據(jù)記錄格式是:CH0、CH1、CH2、CH3、CH4、CH5、CH0、CH1、CH2、CH3、CH4、CH5、CH0、CH1、CH2、CH3、CH4、CH5、CH0、CH1、CH2、CH3、CH4、CH5、CH0、CH1、CH2、CH3、CH4、CH5、
#通過下面拆分成6個CH每個CH一個list以便進行分CH判斷
b[0] = np.concatenate((a[0, 0::6], a[1, 0::6], a[2, 0::6], a[3, 0::6], a[4, 0::6], a[5, 0::6], a[6, 0::6],a[7, 0::6], a[8, 0::6], a[9, 0::6])) #提取變壓器1,A相數(shù)據(jù)
b[1] = np.concatenate((a[0, 1::6], a[1, 1::6], a[2, 1::6], a[3, 1::6], a[4, 1::6], a[5, 1::6], a[6, 1::6],a[7, 1::6], a[8, 1::6], a[9, 1::6])) #提取變壓器1,B相數(shù)據(jù)
b[2] = np.concatenate((a[0, 2::6], a[1, 2::6], a[2, 2::6], a[3, 2::6], a[4, 2::6], a[5, 2::6], a[6, 2::6],a[7, 2::6], a[8, 2::6], a[9, 2::6])) #提取變壓器1,C相數(shù)據(jù)
c[0] = np.concatenate((a[0, 3::6], a[1, 3::6], a[2, 3::6], a[3, 3::6], a[4, 3::6], a[5, 3::6], a[6, 3::6],a[7, 3::6], a[8, 3::6], a[9, 3::6])) #提取變壓器2,A相數(shù)據(jù)
c[1] = np.concatenate((a[0, 4::6], a[1, 4::6], a[2, 4::6], a[3, 4::6], a[4, 4::6], a[5, 4::6], a[6, 4::6],a[7, 4::6], a[8, 4::6], a[9, 4::6])) #提取變壓器2,B相數(shù)據(jù)
c[2] = np.concatenate((a[0, 5::6], a[1, 5::6], a[2, 5::6], a[3, 5::6], a[4, 5::6], a[5, 5::6], a[6, 5::6],a[7, 5::6], a[8, 5::6], a[9, 5::6])) #提取變壓器2,C相數(shù)據(jù)
#=====判斷越線=================================================================================
# 200 是 scan_rate = 10000.0 除以 50(交流電的頻率)得出 每個周期 采樣200個點
# 600 是 120000/200=600 read_request_size*c_time本次總的采樣點數(shù),除以每周期200點,總共600個周期
# 如果 scan_rate = 5000.0 就是 100, alens 是 120000/100=1200
for i in range(600):
#每個CH拆分成每個正弦波周期進行峰值判斷
a1 = b[0][(i * 200):((i + 1) * 200 - 1)]
b1 = b[1][(i * 200):((i + 1) * 200 - 1)]
c1 = b[2][(i * 200):((i + 1) * 200 - 1)]
a2 = c[0][(i * 200):((i + 1) * 200 - 1)]
b2 = c[1][(i * 200):((i + 1) * 200 - 1)]
c2 = c[2][(i * 200):((i + 1) * 200 - 1)]
#每一項分別判斷上下限并分別告警
if np.amax(a1) > upline or np.amax(a1) < downline :
if break_1_1 <0:
GPIO.output(5, GPIO.HIGH) #相應(yīng)引腳置高電平,點亮LED
break_1_1 = 1
t3 = threading.Thread(target=senddata, args=("Trans1A","100")) #調(diào)用上傳進程
t3.start()
if np.amax(b1) > upline or np.amax(b1) < downline ?:
if break_1_2< 0:
GPIO.output(6, GPIO.HIGH) #相應(yīng)引腳置高電平,點亮LED
break_1_2 = 1
t3 = threading.Thread(target=senddata, args=("Trans1B","100")) #調(diào)用上傳進程
t3.start()
if np.amax(c1) > upline or np.amax(c1) < downline:
if break_1_3 < 0:
GPIO.output(19, GPIO.HIGH) #相應(yīng)引腳置高電平,點亮LED
break_1_3 = 1
t3 = threading.Thread(target=senddata, args=("Trans1C","100")) #調(diào)用上傳進程
t3.start()
if np.amax(a2) > upline or np.amax(a2) < downline:
if break_2_1 < 0:
GPIO.output(16, GPIO.HIGH) #相應(yīng)引腳置高電平,點亮LED
break_2_1=1
t3 = threading.Thread(target=senddata, args=("Trans2A","100")) #調(diào)用上傳進程
t3.start()
if np.amax(b2) > upline or np.amax(b2) < downline:
if break_2_2 < 0:
GPIO.output(20, GPIO.HIGH) #相應(yīng)引腳置高電平,點亮LED
break_2_1 =1
t3 = threading.Thread(target=senddata, args=("Trans2B","100")) #調(diào)用上傳進程
t3.start()
if np.amax(c2) > upline or np.amax(c2) < downline:
if break_2_3 < 0:
GPIO.output(21, GPIO.HIGH) #相應(yīng)引腳置高電平,點亮LED
break_2_2 = 1
t3 = threading.Thread(target=senddata, args=("Trans2C","100")) #調(diào)用上傳進程
t3.start()
#每個變壓器任何一項有告警就調(diào)用寫文件進程寫文件
if np.amax(a1) > upline or np.amax(a1) < downline or np.amax(b1) > upline or np.amax(b1) < downline or np.amax(c1) > upline or np.amax(c1) < downline:
if break_1 < 0:
t1 = threading.Thread(target=writefile, args=(b, file1,)) #調(diào)用寫文件進程
t1.start()
break_1 = 2
if np.amax(a2) > upline or np.amax(a2) < downline or np.amax(b2) > upline or np.amax(b2) < downline or np.amax(c2) > upline or np.amax(c2) < downline:
if break_2 < 0:
t1 = threading.Thread(target=writefile, args=(c, file2,)) #調(diào)用寫文件進程
t1.start()
break_2 = 2
print("\033[0;32m","analyse finish ", datetime.datetime.now(),"\033[0m") #font color
# ============================================================================================
def main():
"""
This function is executed automatically when the module is run directly.
"""
# ============================================================================================
try:
# Select an MCC 118 HAT device to use.
address = select_hat_device(HatIDs.MCC_118)
hat = mcc118(address)
'''
print('\nSelected MCC 118 HAT device at address', address)
actual_scan_rate = hat.a_in_scan_actual_rate(num_channels, scan_rate)
print('\nMCC 118 continuous scan example')
print(' Functions demonstrated:')
print(' mcc118.a_in_scan_start')
print(' mcc118.a_in_scan_read')
print(' mcc118.a_in_scan_stop')
print(' Channels: ', end='')
print(', '.join([str(chan) for chan in channels]))
print(' Requested scan rate: ', scan_rate)
print(' Actual scan rate: ', actual_scan_rate)
print(' Options: ', enum_mask_to_string(OptionFlags, options))
# ============================================================================================
try:
input('\nPress ENTER to continue ...')
except (NameError, SyntaxError):
pass
'''
# Configure and start the scan.
# Since the continuous option is being used, the samples_per_channel
# parameter is ignored if the value is less than the default internal
# buffer size (10000 * num_channels in this case). If a larger internal
# buffer size is desired, set the value of this parameter accordingly.
print('Starting scan ... Press Ctrl-C to stop\n')
hat.a_in_scan_start(channel_mask, samples_per_channel, scan_rate,options)
# ============================================================================================
try:
while True:
fna = str(datetime.datetime.now()) + ".txt"
#連續(xù)采樣10次
for i in range(c_time):
read_result = hat.a_in_scan_read(read_request_size, timeout) # read channel Data
arraydata[i]=read_result.data
if read_result.hardware_overrun:
print('\n\nHardware overrun\n')
break
elif read_result.buffer_overrun:
print('\n\nBuffer overrun\n')
break
hat.a_in_scan_stop()
hat.a_in_scan_cleanup()
hat.a_in_scan_start(channel_mask, samples_per_channel, scan_rate,options)
#調(diào)用分析進程
arraydatat = arraydata
t2 = threading.Thread(target=analyse, args=(arraydatat, fna, ))
t2.start()
except KeyboardInterrupt:
# Clear the '^C' from the display.
print(CURSOR_BACK_2, ERASE_TO_END_OF_LINE, '\n')
print('Stopping')
hat.a_in_scan_stop()
hat.a_in_scan_cleanup()
except (HatError, ValueError) as err:
print('\n', err)
if __name__ == '__main__':
#表用進程實時監(jiān)測告警狀態(tài),判斷是否到時間消除告警
t4 = threading.Thread(target=led_detect)
t4.start()
main()
程序部署
參考這篇教程安裝好 MCC DAQ HATs 的 SDK 和代碼示例。
https://shumeipai.nxez.com/2018/10/18/get-started-with-the-evaluation-for-mcc-118.html
將上面的程序保存為 main.py 然后在程序所在目錄運行下面的命令即可啟動程序。
-
交流電
+關(guān)注
關(guān)注
14文章
661瀏覽量
34032 -
樹莓派
+關(guān)注
關(guān)注
117文章
1710瀏覽量
105722
發(fā)布評論請先 登錄
相關(guān)推薦
評論