AWS Cloud, Python, Use sshtunneling to store rds data in a local database
1. config.py
-----------------------------------------------------------------------#SSH Cloud
username = 'ubuntu'
pkey = './aws.pem'
pkey_password = ''
gateway_url = 'anonymous.compute.amazonaws.com'
local_bind_address = '0.0.0.0'
local_port = 3307
#AmazoneDB
remote_bind_address = 'anonymous.rds.amazonaws.com'
remote_db_user = 'USERID'
remote_db_password = 'PASSWORD'
remote_port = 3306
select_db = 'DATABASE'
#LocalTestDB
dev_host = '192.168.0.XXX'
dev_user = 'USERID'
dev_password = 'PASSWORD'
dev_port = 3306
db_charset = 'utf8'
insert_db = 'DATABASE'
'''
use mysql
create database DATABASENAME;
grant all privileges on DATABASENAME.* to USERID@localhost identified by 'PASSWORD';
grant all privileges on DATABASENAME.* to USERID@'%' identified by 'PASSWORD';
flush privileges;
quit;
END'''
'''
create table logs (
seq int(11) not null auto_increment,
status char(1),
percentage float(4,2),
measured_at datetime default now(),
primary key(seq)
);
END'''
'''
alter table logs auto_increment=1
END'''-----------------------------------------------------------------------
2.collectData.py
-----------------------------------------------------------------------import pymysql
import datetime
from sshtunnel import SSHTunnelForwarder
from config import username, pkey, pkey_password, local_bind_address, remote_bind_address, remote_port, local_port, gateway_url, \
remote_db_user, remote_db_password, select_db, db_charset, dev_user, dev_password, insert_db, dev_host, dev_port
class Query:
def __init__(self, table):
self.table = table
self.filters = ''
def filter_by(self, filters):
self.filters = 'WHERE {}'.format(filters) if not self.filters else '{} AND {}'.format(self.filters, filters)
return self
def get_select_query(self):
query = 'SELECT PERCENTAGE, STATUS FROM {} WHERE STATUS <> \'\' GROUP BY STATUS) AS B LIMIT 1'.format(self.table, self.table)
return query if not self.filters else '{} {}'.format(query, self.filters)
def get_insert_query(self, columns):
return 'INSERT INTO {} ({}) VALUES ({})'.format(self.table, ','.join(columns), ','.join(['%s' for i in columns]))
class DBConnection:
def __init__(self, host, user, password, database, charset, port):
self.connection = pymysql.connect(
host=host,
user=user,
password=password,
db=database,
charset=charset,
port=port,
cursorclass=pymysql.cursors.DictCursor)
def execute_select_query(self, table, filters=None):
with self.connection.cursor() as cursor:
query = Query(table).filter_by(filters).get_select_query() if filters else Query(table).get_select_query()
cursor.execute(query)
data = cursor.fetchall()
return data
def execute_insert_query(self, table, data):
columns = data[0].keys()
#print('columns: ',columns)
values = [tuple(row.values()) for row in data]
#print('values: ',values)
query = Query(table).get_insert_query(columns)
#print('query: ',query)
with self.connection as cur:
cur.executemany(query, values)
def close(self):
self.connection.close()
def commit(self):
self.connection.commit()
def main():
with SSHTunnelForwarder((gateway_url, 22),
ssh_username=username,
ssh_pkey=pkey,
ssh_private_key_password=pkey_password,
remote_bind_address=(remote_bind_address, remote_port),
local_bind_address=(local_bind_address, local_port)
) as tunnel:
remote_db = DBConnection(host=local_bind_address,
user=remote_db_user,
password=remote_db_password,
database=select_db,
port=local_port,
charset=db_charset)
dev_db = DBConnection(host=dev_host,
user=dev_user,
password=dev_password,
database=insert_db,
port=dev_port,
charset=db_charset)
try:
data = remote_db.execute_select_query(table='COLLECTDATA', filters=None)
print(data)
if data:
for row in data:
measured_at = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(row.get('STATUS'),row.get('PERCENTAGE'),measured_at)
dev_db.execute_insert_query('logs', data)
except Exception as e:
with open('/home/project/error.log', 'a') as file:
file.write('{} YOU GOT AN ERROR: {} during getting {}\n'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), str(e), table))
dev_db.commit()
dev_db.close()
remote_db.close()
if __name__ == '__main__':
main()-----------------------------------------------------------------------
-----------------------------------------------------------------------#SSH Cloud
username = 'ubuntu'
pkey = './aws.pem'
pkey_password = ''
gateway_url = 'anonymous.compute.amazonaws.com'
local_bind_address = '0.0.0.0'
local_port = 3307
#AmazoneDB
remote_bind_address = 'anonymous.rds.amazonaws.com'
remote_db_user = 'USERID'
remote_db_password = 'PASSWORD'
remote_port = 3306
select_db = 'DATABASE'
#LocalTestDB
dev_host = '192.168.0.XXX'
dev_user = 'USERID'
dev_password = 'PASSWORD'
dev_port = 3306
db_charset = 'utf8'
insert_db = 'DATABASE'
'''
use mysql
create database DATABASENAME;
grant all privileges on DATABASENAME.* to USERID@localhost identified by 'PASSWORD';
grant all privileges on DATABASENAME.* to USERID@'%' identified by 'PASSWORD';
flush privileges;
quit;
END'''
'''
create table logs (
seq int(11) not null auto_increment,
status char(1),
percentage float(4,2),
measured_at datetime default now(),
primary key(seq)
);
END'''
'''
alter table logs auto_increment=1
END'''-----------------------------------------------------------------------
2.collectData.py
-----------------------------------------------------------------------import pymysql
import datetime
from sshtunnel import SSHTunnelForwarder
from config import username, pkey, pkey_password, local_bind_address, remote_bind_address, remote_port, local_port, gateway_url, \
remote_db_user, remote_db_password, select_db, db_charset, dev_user, dev_password, insert_db, dev_host, dev_port
class Query:
def __init__(self, table):
self.table = table
self.filters = ''
def filter_by(self, filters):
self.filters = 'WHERE {}'.format(filters) if not self.filters else '{} AND {}'.format(self.filters, filters)
return self
def get_select_query(self):
query = 'SELECT PERCENTAGE, STATUS FROM {} WHERE STATUS <> \'\' GROUP BY STATUS) AS B LIMIT 1'.format(self.table, self.table)
return query if not self.filters else '{} {}'.format(query, self.filters)
def get_insert_query(self, columns):
return 'INSERT INTO {} ({}) VALUES ({})'.format(self.table, ','.join(columns), ','.join(['%s' for i in columns]))
class DBConnection:
def __init__(self, host, user, password, database, charset, port):
self.connection = pymysql.connect(
host=host,
user=user,
password=password,
db=database,
charset=charset,
port=port,
cursorclass=pymysql.cursors.DictCursor)
def execute_select_query(self, table, filters=None):
with self.connection.cursor() as cursor:
query = Query(table).filter_by(filters).get_select_query() if filters else Query(table).get_select_query()
cursor.execute(query)
data = cursor.fetchall()
return data
def execute_insert_query(self, table, data):
columns = data[0].keys()
#print('columns: ',columns)
values = [tuple(row.values()) for row in data]
#print('values: ',values)
query = Query(table).get_insert_query(columns)
#print('query: ',query)
with self.connection as cur:
cur.executemany(query, values)
def close(self):
self.connection.close()
def commit(self):
self.connection.commit()
def main():
with SSHTunnelForwarder((gateway_url, 22),
ssh_username=username,
ssh_pkey=pkey,
ssh_private_key_password=pkey_password,
remote_bind_address=(remote_bind_address, remote_port),
local_bind_address=(local_bind_address, local_port)
) as tunnel:
remote_db = DBConnection(host=local_bind_address,
user=remote_db_user,
password=remote_db_password,
database=select_db,
port=local_port,
charset=db_charset)
dev_db = DBConnection(host=dev_host,
user=dev_user,
password=dev_password,
database=insert_db,
port=dev_port,
charset=db_charset)
try:
data = remote_db.execute_select_query(table='COLLECTDATA', filters=None)
print(data)
if data:
for row in data:
measured_at = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(row.get('STATUS'),row.get('PERCENTAGE'),measured_at)
dev_db.execute_insert_query('logs', data)
except Exception as e:
with open('/home/project/error.log', 'a') as file:
file.write('{} YOU GOT AN ERROR: {} during getting {}\n'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), str(e), table))
dev_db.commit()
dev_db.close()
remote_db.close()
if __name__ == '__main__':
main()-----------------------------------------------------------------------
댓글