這個 Slack 聊天機器人專注於自動化處理使用者在 Slack 頻道中的特定查詢指令,並與 Splunk 進行整合,以取得和傳回查詢結果。機器人會解析使用者輸入的指令,例如查詢目前 down 的管理介面或 up 的路由器,並將這些指令轉換為對應的 Splunk 查詢語法。接著,機器人會執行 Splunk 查詢,並將查詢結果格式化後回傳給 Slack 頻道,讓使用者可以直接在 Slack 中獲得所需的資訊。這個整合方案旨在簡化查詢流程,提高效率,並減少人工操作。
Slack 聊天機器人與 Splunk 整合實作
本文將介紹如何建置一個 Slack 聊天機器人,並將其與 Splunk 進行整合,以實作自動化查詢與回應。該聊天機器人能夠回應特定的使用者查詢,例如查詢目前狀態為 down 的管理介面,或是查詢管理介面為 up 的路由器。
核心 API 框架程式碼解析
在 Slack 聊天機器人的核心 API 框架中,主要實作了以下功能:
- 確認 Slack 請求: 在三秒內回應 Slack 的請求,以避免 Slack 回報「端點無法到達」。
- 避免機器人訊息迴圈: 檢查訊息是否來自同一隻機器人,若是則忽略,避免無限迴圈。
- 驗證來源: 使用 token 驗證請求的來源,確保回應來自於授權的來源。
import falcon
import json
import requests
import base64
from splunkquery import run
from splunk_alexa import alexa
from channel import channel_connect, set_data
class Bot_BECJ82A3V:
def on_get(self, req, resp):
resp.status = falcon.HTTP_200
resp.body = json.dumps({"Server is Up!"})
def on_post(self, req, resp):
data = req.bounded_stream.read()
try:
bot_id = json.loads(data)["event"]["bot_id"]
if bot_id == "BECJ82A3V":
resp.status = falcon.HTTP_200
resp.body = ""
return
except:
pass
# ...
內容解密:
on_get方法:處理 GET 請求,回傳伺服器狀態。on_post方法:處理 POST 請求,解析請求內容,檢查是否為機器人訊息,並進行驗證。bot_id檢查:避免機器人訊息迴圈,若訊息來自同一隻機器人,則忽略。
頻道互動回應實作
該程式碼負責解析特定聊天指令,並回應給使用者或頻道 ID,同時使用認證 token 向 Slack API 發送回應。
import json
import requests
import base64
from splunk_alexa import alexa
channl = ""
token = ""
resp = ""
def set_data(Channel, Token, Response):
global channl, token, resp
channl = Channel
token = Token
resp = Response
def send_data(text):
global channl, token, resp
requests.post("https://slack.com/api/chat.postMessage", data='{"channel":"' + channl + '","text":"' + text + '"}', headers={"Content-type": "application/json", "Authorization": "Bearer " + token}, verify=False)
def channel_connect(text):
global channl, token, resp
try:
arg = text.split(' ')
path = arg[0].lower()
if path in ["decode", "encode"]:
# 處理編碼/解碼指令
deencode(arg, text)
else:
result = alexa(arg, resp)
text = ""
for i in result:
for j in i.values():
text = text + ' ' + j
if text == "" or text == None:
text = "None"
send_data(text)
except:
pass
def deencode(arg, text):
global channl, token, resp
decode = arg[1]
if arg[0].lower() == "encode":
encoded = base64.b64encode(str.encode(decode))
if '[:]' in decode:
text = "Encoded string: " + encoded.decode('utf-8')
send_data(text)
else:
text = "sample string format username[:]password"
send_data(text)
else:
try:
creds = base64.b64decode(decode)
creds = creds.decode("utf-8")
# 處理解碼後的憑證
except:
text = "Error decoding the string. Check your encoded string."
send_data(text)
內容解密:
set_data函式:設定頻道、token 和回應物件的全域變數。send_data函式:向 Slack API 傳送訊息,使用指定的頻道和 token。channel_connect函式:解析使用者輸入的指令,並根據指令執行對應的操作,例如編碼/解碼或查詢 Splunk。deencode函式:處理編碼/解碼指令,將使用者輸入的字串進行 Base64 編碼或解碼。
Splunk 查詢整合
該程式碼將使用者的自然語言查詢轉換為 Splunk 查詢,並將結果回傳給 Slack 聊天頻道。例如,使用者可以查詢目前狀態為 down 的管理介面,或是查詢管理介面為 up 的路由器。
# 使用 Splunk 查詢模組執行查詢
result = alexa(arg, resp)
內容解密:
alexa函式:將使用者的自然語言查詢轉換為 Splunk 查詢,並執行查詢。- 查詢結果處理:將查詢結果格式化後,回傳給 Slack 聊天頻道。
連續整合第6章:透過聊天機器人實作網路自動化
在現代網路管理中,連續整合(Continual Integration)扮演著越來越重要的角色。本章節將探討如何透過聊天機器人實作網路自動化,涵蓋與Splunk的整合、與SolarWinds的互動,以及使用Python組態OSPF等主題。
與Slack整合的聊天機器人
首先,我們來看一個與Slack整合的聊天機器人範例。這個機器人可以根據使用者的輸入執行特定的動作,並將結果回傳到Slack聊天視窗中。
from splunkquery import run
def alexa(data, resp):
try:
string = data.split(' ')
except:
string = data
search = ' '.join(string[0:-1])
param = string[-1]
print("param" + param)
match_dict = {0: "routers management interface", 1: "routers management loopback"}
for no in range(2):
print(match_dict[no].split(' '))
print(search.split(' '))
test = list(map(lambda x: x in search.split(' '), match_dict[no].split(' ')))
print(test)
print(no)
if False in test:
pass
else:
if no in [0, 1]:
if param.lower() == "up":
query = "search%20index%3D%22main%22%20earliest%3D0%20%7C%20dedup%20interface_name%2Crouter_name%20%7C%20where%20interface_name%3D%22Loopback45%22%20%20and%20interface_status%3D%22up%22%20%7C%20table%20router_name"
elif param.lower() == "down":
query = "search%20index%3D%22main%22%20earliest%3D0%20%7C%20dedup%20interface_name%2Crouter_name%20%7C%20where%20interface_name%3D%22Loopback45%22%20%20and%20interface_status%21%3D%22up%22%20%7C%20table%20router_name"
else:
return "None"
result = run(query, resp)
return result
內容解密:
alexa函式接收兩個引數:data和resp,其中data是使用者輸入的資料,resp是用於回傳結果的物件。- 程式碼首先嘗試將
data分割成單詞,並提取最後一個單詞作為引數param。 match_dict是一個字典,用於比對使用者輸入的指令與預定義的管理介面或loopback相關指令。- 根據比對結果,程式碼會執行對應的Splunk查詢,例如檢查特定介面是否為UP或DOWN狀態。
- 查詢結果會透過
run函式執行,並將結果回傳。
Splunk查詢範例
Splunk查詢用於檢索特定資料,例如檢查介面狀態。以下是兩個範例查詢:
- 檢查UP狀態的介面:
index="main" earliest=0 | dedup interface_name,router_name | where interface_name="Loopback45" and interface_status="up" | table router_name
- 檢查非UP狀態的介面:
index="main" earliest=0 | dedup interface_name,router_name | where interface_name="Loopback45" and interface_status!="up" | table router_name
內容解密:
index="main"指定了要查詢的索引名稱。earliest=0表示從最早的時間點開始查詢。dedup interface_name,router_name用於去除重複的介面名稱和路由器名稱組合。where子句用於過濾特定的介面名稱和狀態。table router_name將結果以表格形式呈現,只顯示路由器名稱。
與SolarWinds互動
另一個範例是與SolarWinds伺服器互動,以擷取特定路由器的IP位址。
import requests
from orionsdk import SwisClient
npm_server = 'npm_serverip'
username = 'test'
password = 'test123'
verify = False
if not verify:
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
swis = SwisClient(npm_server, username, password)
keyvalue = "mytestrouter"
results = swis.query("SELECT Caption AS NodeName, IPAddress FROM Orion.Nodes WHERE NodeName =@id", id=keyvalue)
if results['results'] == []:
print("query didn't return any output. node might not be present in SolarWinds DataBase")
else:
uri = results['results'][0]['IPAddress']
print(uri)
內容解密:
- 程式碼使用
orionsdk函式庫連線到SolarWinds伺服器。 SwisClient物件用於執行SQL查詢,擷取特定節點的IP位址。- 如果查詢結果為空,表示節點可能不存在於SolarWinds資料函式庫中。
使用Python組態OSPF
最後一個範例是使用Python組態OSPF。
from netmiko import ConnectHandler
import time
ospfbaseconfig = """
router ospf 1
network 192.168.20.0 0.0.0.255 area 0
"""
ospfbaseconfig = ospfbaseconfig.split("\n")
def pushospfconfig(routerip, ospfbaseconfig):
uname = "test"
passwd = "test"
device = ConnectHandler(device_type='cisco_ios', ip=routerip, username=uname, password=passwd)
xcheck = device.send_config_set(ospfbaseconfig)
print(xcheck)
outputx = device.send_command("wr mem")
print(outputx)
device.disconnect()
def validateospf(routerip):
uname = "test"
passwd = "test"
device = ConnectHandler(device_type='cisco_ios', ip=routerip, username=uname, password=passwd)
cmds = "show ip ospf neighbor"
outputx = device.send_command(cmds)
if ("FULL/" in outputx):
print("On router " + routerip + " neighborship is full")
else:
print("On router " + routerip + " neighborship is not in FULL state")
device.disconnect()
routers = ['192.168.20.1', '192.168.20.3']
for routerip in routers:
pushospfconfig(routerip, ospfbaseconfig)
print("Now sleeping for 10 seconds....")
time.sleep(10)
for routerip in routers:
validateospf(routerip)
內容解密:
- 程式碼使用
netmiko函式庫連線到Cisco IOS裝置。 pushospfconfig函式用於推播OSPF基本組態到路由器。validateospf函式用於驗證OSPF鄰居關係是否建立。