AWS Cloud, Python, Use sshtunneling to store rds data in a local database

1. config.py
-----------------------------------------------------------------------#SSH Cloud
username = 'ubuntu'
pkey = './aws.pem'
pkey_password = ''
gateway_url = 'anonymous.compute.amazonaws.com'

local_bind_address = '0.0.0.0'
local_port = 3307

#AmazoneDB
remote_bind_address = 'anonymous.rds.amazonaws.com'
remote_db_user = 'USERID'
remote_db_password = 'PASSWORD'
remote_port = 3306
select_db = 'DATABASE'

#LocalTestDB
dev_host = '192.168.0.XXX'
dev_user = 'USERID'
dev_password = 'PASSWORD'
dev_port = 3306

db_charset = 'utf8'
insert_db = 'DATABASE'

'''
use mysql
create database DATABASENAME;
grant all privileges on DATABASENAME.* to USERID@localhost identified by 'PASSWORD';
grant all privileges on DATABASENAME.* to USERID@'%' identified by 'PASSWORD';
flush privileges;
quit;
END'''

'''
create table logs (
seq int(11) not null auto_increment,
status char(1),
percentage float(4,2),
measured_at datetime default now(),
primary key(seq)
);
END'''

'''
alter table logs auto_increment=1
END'''
-----------------------------------------------------------------------
2.collectData.py
-----------------------------------------------------------------------import pymysql
import datetime

from sshtunnel import SSHTunnelForwarder
from config import username, pkey, pkey_password, local_bind_address, remote_bind_address, remote_port, local_port, gateway_url, \
    remote_db_user, remote_db_password, select_db, db_charset, dev_user, dev_password, insert_db, dev_host, dev_port


class Query:
    def __init__(self, table):
        self.table = table
        self.filters = ''

    def filter_by(self, filters):
        self.filters = 'WHERE {}'.format(filters) if not self.filters else '{} AND {}'.format(self.filters, filters)
        return self

    def get_select_query(self):

        query = 'SELECT PERCENTAGE, STATUS FROM {} WHERE STATUS <> \'\' GROUP BY STATUS) AS B LIMIT 1'.format(self.table, self.table)
     
        return query if not self.filters else '{} {}'.format(query, self.filters)
       
    def get_insert_query(self, columns):
      return 'INSERT INTO {} ({}) VALUES ({})'.format(self.table, ','.join(columns), ','.join(['%s' for i in columns]))

class DBConnection:
    def __init__(self, host, user, password, database, charset, port):
        self.connection = pymysql.connect(
            host=host,
            user=user,
            password=password,
            db=database,
            charset=charset,
            port=port,
            cursorclass=pymysql.cursors.DictCursor)

    def execute_select_query(self, table, filters=None):
        with self.connection.cursor() as cursor:
            query = Query(table).filter_by(filters).get_select_query() if filters else Query(table).get_select_query()
            cursor.execute(query)
            data = cursor.fetchall()
        return data
       
    def execute_insert_query(self, table, data):
      columns = data[0].keys()
      #print('columns: ',columns)
      values = [tuple(row.values()) for row in data]
      #print('values: ',values)
      query = Query(table).get_insert_query(columns)
      #print('query: ',query)
      with self.connection as cur:
        cur.executemany(query, values) 

    def close(self):
      self.connection.close()

    def commit(self):
      self.connection.commit()


def main():
    with SSHTunnelForwarder((gateway_url, 22),
                            ssh_username=username,
                            ssh_pkey=pkey,
                            ssh_private_key_password=pkey_password,
                            remote_bind_address=(remote_bind_address, remote_port),
                            local_bind_address=(local_bind_address, local_port)
                            ) as tunnel:
        remote_db = DBConnection(host=local_bind_address,
                                 user=remote_db_user,
                                 password=remote_db_password,
                                 database=select_db,
                                 port=local_port,
                                 charset=db_charset)

        dev_db = DBConnection(host=dev_host,
                                user=dev_user,
                                password=dev_password,
                                database=insert_db,
                                port=dev_port,
                                charset=db_charset)

        try:
         
          data = remote_db.execute_select_query(table='COLLECTDATA', filters=None)
           
          print(data)
         
          if data:
            for row in data:
              measured_at = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
              print(row.get('STATUS'),row.get('PERCENTAGE'),measured_at)           
           
              dev_db.execute_insert_query('logs', data)

        except Exception as e:
          with open('/home/project/error.log', 'a') as file:
            file.write('{} YOU GOT AN ERROR: {} during getting {}\n'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), str(e), table))

        dev_db.commit()
        dev_db.close()
        remote_db.close()

if __name__ == '__main__':
    main()
-----------------------------------------------------------------------

댓글

이 블로그의 인기 게시물

[LINUX] CentOS 부팅시 오류 : UNEXPECTED INCONSISTENCY; RUN fsck MANUALLY

[MSSQL] 데이터베이스가 사용 중이어서 배타적으로 액서스할 수 없습니다

구글코랩) 안전Dream 실종아동 등 검색 오픈API 소스를 공유합니다. (구글드라이브연동, 이미지 수집 소스)