自动添加阿里云ECS安全组规则

开发人员需直连MySQL从库查询数据,公司出口没有固定IP,使用Python写个脚本定时获取出口IP,再添加到ECS的安全组规则。
网络是由4条宽带聚合到路由器,不同IP范围使用不同的宽带。所以可以登录路由器管理界面获取4条宽带的外网IP。

ECS安全组规则可通过阿里云提供的Python SDK添加。

安装阿里云 Python SDK

1
2
3
4
5
6
# 安装 requests 库
pip3 install requests
# 安装SDK核心库
pip3 install aliyun-python-sdk-core-v3
# 安装ECS SDK
pip3 install aliyun-python-sdk-ecs

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Filename: getip
# Author: xo9
# Describe: 获取公网IP

import os
import sqlite3
import requests
import json
from urllib.parse import urlencode
from itertools import chain
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkecs.request.v20140526.AuthorizeSecurityGroupRequest import AuthorizeSecurityGroupRequest
from aliyunsdkecs.request.v20140526.RevokeSecurityGroupRequest import RevokeSecurityGroupRequest

def addrule(i):
request = AuthorizeSecurityGroupRequest()
request.set_accept_format('json')

request.set_SourceCidrIp(i + "/32")
request.set_PortRange("3306/3306")
request.set_IpProtocol("tcp")
request.set_SecurityGroupId("安全组ID")

response = client.do_action_with_exception(request)
print(str(response, encoding='utf-8'))

def removerule(i):
request = RevokeSecurityGroupRequest()
request.set_accept_format('json')

request.set_SecurityGroupId("安全组ID")
request.set_PortRange("3306/3306")
request.set_IpProtocol("tcp")
request.set_SourceCidrIp(i + "/32")

response = client.do_action_with_exception(request)
print(str(response, encoding='utf-8'))

def getip():
s=requests.Session()
uri='http://192.168.1.1'
headers={
'Content-Type':'application/x-www-form-urlencoded; charset=UTF-8',
'Accept':'application/json,text/javascript,*/*; q=0.01',
'Referer':'%s/webpages/login.html' % uri
}
payload={
"method":"login",
"params":{
"username": "路由器用户名",
"password": "路由器加密密码"
}
}
r=s.post(uri + '/cgi-bin/luci/;stok=/login?form=login', data=urlencode({"data":json.dumps(payload)}),headers=headers,timeout=5)
stok=json.loads(r.text)['result']['stok'].encode('utf-8')
cookie=(r.headers['Set-Cookie']).split(';')[0]

headers={
'Content-Type':'application/x-www-form-urlencoded; charset=UTF-8',
'Accept':'application/json,text/javascript,*/*; q=0.01',
'Referer':'%s/webpages/index.html' % uri,
'Cookie': '%s' % cookie
}
payload={
"method":"get"
}
url='%s/cgi-bin/luci/;stok=%s/admin/interface?form=status2' % (uri,stok.decode("utf-8"))
r=s.post(url, data=urlencode({"data":json.dumps(payload)}),headers=headers, timeout=5)
wlan=[]
data_json=json.loads(r.text)
for x in [1,3,4]:
ipaddr=data_json['result']['normal'][x]['ipaddr']
wlan.append(ipaddr)
ipaddr=(requests.get('https://api.ip.sb/ip').text).strip()
wlan.append(ipaddr)
return wlan

def checkip(wlan):
con=sqlite3.connect('ipaddr.db')
cursor=con.cursor()
cursor.execute('CREATE TABLE if not exists ip(id integer primary key autoincrement,ipaddr TEXT)')
cursor.execute("SELECT ipaddr FROM ip")
wlanip=list(chain.from_iterable(cursor.fetchall()))
# 计算差集
diffip=list(set(wlan) ^ set(wlanip))
if len(diffip)==0:
pass
else:
for x in range(len(diffip)):
cursor.execute("SELECT ipaddr FROM ip WHERE ipaddr=?", (diffip[x],))
if len(cursor.fetchall())==0:
cursor.execute("INSERT INTO ip(ipaddr) VALUES(?)", (diffip[x],))
addrule(diffip[x])
else:
cursor.execute("DELETE FROM ip WHERE ipaddr=?", (diffip[x],))
removerule(diffip[x])
cursor.close()
con.commit()
con.close()

if __name__ == '__main__':
client=AcsClient('AccessKey ID','AccessKey Secret','Region ID')
iplist=getip()
checkip(iplist)
print("Ip更新完成")

运行结果


一分、两分都是爱!