파이썬 부산창조경제혁신센터,알림마당,사업공고 크롤링 소스를 공개합니다.

1) 디비 스키마는 아래와 같습니다.

* desc rnd

 * desc rnd_category




2) 개발환경은 macOS, 서비스환경은 AWS EC3 ubuntu 입니다.

3) macOS 로컬 디비환경은 mysql docker, AWS RDS는 mysql 입니다.

4) 파이썬으로 해당 사이트를 크롤링하여 디비에 저장하고 요약한 정보를 슬랙의 메시지로 보내는 소스입니다.

5) 소스는 아래와 같습니다. 간단한 소스라 주석은 달지 않았습니다.

#conifg.py

class Configuration:

  def get_configuration(choose):

    if(choose == 'mac'):
      connect_value = dict(host='127.0.0.1',
        user='xxxx',
        password='xxxx',
        database='xxxx',
        port=3306,
        charset='utf8')
      
    elif(choose == 'ubuntu'):
      connect_value = dict(host='xxxx.amazonaws.com',
        user='xxxx',
        password='xxxx',
        database='xxxx',
        port=3306,
        charset='utf8')
    else:
      print('Not Selected')
      connect_value = ''

    return connect_value

#db.py
import pymysql

class DBConnection:
  def __init__(self,host,user,password,database,charset,port):
    self.connection = pymysql.connect(
      host=host,
      user=user,
      password=password,
      db=database,
      charset=charset,
      port=port,
      cursorclass=pymysql.cursors.DictCursor)

  def exec_select_rnd(self,idx,mcd,scd):
    with self.connection.cursor() as cursor:
      query = Query().get_select_rnd(idx,mcd,scd)
      cursor.execute(query)
      for row in cursor:
        data = row.get('cnt')
    #print('data: ',data)
    return data

  def exec_select_category(self,choose,code):
    with self.connection.cursor() as cursor:
      query = Query().get_select_category(choose,code)
      cursor.execute(query)   
      for row in cursor:
        if(choose == 1):
          data = row.get('val')
        elif(choose == 2):
          data = row.get('name')
    #print('data: ',data)
    return data

  def exec_insert_rnd(self,mcd,scd,idx,title,link,period,government,posted):
    #print('exec insert crawling: ',mcd,scd,idx,title,period,government,posted)
    query = Query().get_insert_rnd(mcd,scd,idx,title,link,period,government,posted)
    with self.connection as cur:
      cur.execute(query)

  def close(self):
    self.connection.close()

  def commit(self):
    self.connection.commit()

class Query:
  def get_select_rnd(self,idx,mcd,scd):
    query = 'select \
    count(*) as cnt \
    from rnd \
    where idx={} and mcd=\'{}\' and scd=\'{}\''.format(idx,mcd,scd)

    #print('getselect: ',idx,query)
    return query

  def get_select_category(self,choose,code):
    field = ''
    if(choose == 1):
      field = 'val'
    elif(choose == 2):
      field = 'name'
    query = 'select {} from rnd_category \
      where cd=\'{}\''.format(field,code)
    #print('getselectcode: ',query)
    return query

  def get_insert_rnd(self,mcd,scd,idx,title,link,period,government,posted):
    query = 'insert into rnd (mcd,scd,idx,title,link,period,government,posted) \
    values (\'{}\',\'{}\',{},\'{}\',\'{}\',\'{}\',\'{}\',\'{}\')'.format(mcd,scd,idx,title,link,period,government,posted)
    #print('getinsert: ',query)
    return query    

#mod_creativekorea.py

import re

class Tag:

  def lately_news(soup):
    tmp = []
    tmp2 = []
    table = soup.find('table',{'class':'tbl1'})
    for extract in table.select('tr'):
      td = table.find_all('td')
      onclick = td[1].find('a').attrs['onclick']
      title = Utilities.remove_keyword(td[1].find('a').get_text().strip())
      posted = td[3].get_text().strip()
      tmp = Utilities.make_array(str(onclick),',')
      tmp2 = Utilities.make_array(str(tmp[0]),'(')
      idx = str(tmp2[1])
      string = idx + ';' + title + ';' + posted
    return string 


class Utilities:

  def make_array(string,keyword): 
    arr = []
    arr = string.split(keyword)
    return arr

  def remove_keyword(string):
    string = string.replace('\"', '')
    return string

#creativekorea.py -> python3 creativekorea.py m03 s21 -> 부산창조경제혁신센터 > 알림마당 > 사업공고 크롤링
import os
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
import datetime
import sys
from config import Configuration
from db import DBConnection,Query
from mod_creativekorea import Utilities,Tag
from slacker import Slacker

class News:

  def __init__(self,mcd,scd,platform):
    #print('init')
    self.mcd = mcd
    self.scd = scd
    self.platform = platform

  def set_params(self):
    self.mcd = sys.argv[1]
    self.scd = sys.argv[2]  
    self.platform = sys.argv[3]   

  def validate(self):
    default = {
    'mcd':'m03',
    'scd':'s21',
    'platform':'mac'}

    self.mcd = default.get('mcd') if self.mcd == '' else self.mcd.lower()
    self.scd = default.get('scd') if self.scd == '' else self.scd.lower()
    self.platform = default.get('platform') if self.platform == '' else self.platform.lower()#<-add p="">
  
  def url(self):
    if self.mcd == 'm03':
      url = 'https://ccei.creativekorea.or.kr/busan/allim/allim_list.do?page=1&div_code=' 
    return url

  def send_slack(self,channel,pretext,text,title,link):
    print('slack',channel,pretext,text,title,link)

    attachments_dict = dict()
    attachments_dict['pretext'] = pretext
    attachments_dict['title'] = title
    attachments_dict['title_link'] = link
    attachments_dict['text'] = text
    attachments = [attachments_dict]

    token = 'xoxp-xxx-xxxx-xxxx-xxxx'
    slack = Slacker(token)
    slack.chat.post_message(channel=channel, text=None, attachments=attachments, as_user=True)

  def make_message(self,pretext,news_field,url):
    text = '등록일: ' + news_field[2]
    self.send_slack('#channel', pretext, text, str(news_field[1]), url)

  def crawling(self):
    #print('crawling')
    self.validate()

    try:  

      options = webdriver.ChromeOptions()
      options.add_argument('headless')
      options.add_argument('window-size=1920x1080')
      options.add_argument("disable-gpu")

      configuration = Configuration.get_configuration(self.platform)
      _host = configuration['host']
      _user = configuration['user']
      _password = configuration['password']
      _database = configuration['database']
      _port = configuration['port']
      _charset = configuration['charset'] 

      conn = DBConnection(host=_host,
        user=_user,
        password=_password,
        database=_database,
        port=_port,
        charset=_charset)

      cd_val = conn.exec_select_category(1,self.scd)
      cd_name = conn.exec_select_category(2,self.mcd)

      print('cd_val',cd_val)
      print('cd_name',cd_name)

      driver = webdriver.Chrome('/Users/gyunseul9/Bitbucket/rnd_news/chromedriver', chrome_options=options)
      driver.implicitly_wait(3)
      driver.get(self.url()+cd_val) 

      html = driver.page_source
      soup = BeautifulSoup(html, 'html.parser')

      lately_news = Tag.lately_news(soup)
      news_field = Utilities.make_array(lately_news,';')
      field_size = len(news_field)

      print('lately_news', lately_news)
      print('field_size', field_size)
      
      for i in range(0, field_size):
        print(i,news_field[i])
  
      cnt = conn.exec_select_rnd(news_field[0],self.mcd.lower(),self.scd.lower()) 

      if cnt:
        print('overlap idx field: ',cnt,self.mcd.lower(),self.scd.lower())
      else: 
        print('does not overlap idx field: ',cnt,self.mcd.lower(),self.scd.lower())   
        conn.exec_insert_rnd(self.mcd.lower(),self.scd.lower(),news_field[0],news_field[1],self.url()+cd_val,'-','-',news_field[2])

        self.make_message(cd_name,news_field,self.url()+cd_val)

    except Exception as e:
      with open('./creativekorea_error.log','a') as file:
        file.write('{} YOU GOT AN ERROR: {}\n'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),str(e)))                

def run():
  news = News('','','') 
  news.set_params()
  news.crawling()

if __name__ == "__main__":
  run()

댓글

이 블로그의 인기 게시물

[LINUX] CentOS 부팅시 오류 : UNEXPECTED INCONSISTENCY; RUN fsck MANUALLY

[MSSQL] 데이터베이스가 사용 중이어서 배타적으로 액서스할 수 없습니다

[JAVA] Java 프로그램에서 POI로 Excel파일을 조작하자