들어가며#
다룰 내용#
낭만스키의 기능중 하나인 전국의 스키장의 모든 슬로프 현황을 실시간으로 불러와서 동일한 포맷의 형태로 제공해주는 것을 만드는 과정에 대해서 작성합니다.
슬로프 현황은 0.1초가 중요할 만큼 실시간성이 필요하지 않기 때문에 크롤링만 담당하는 크롤러 서버를 구축해서 일정 시간마다 한번씩 모든 리조트 사이트를 돌면서 DB에 저장하는 방식을 사용했습니다. 프론트에서 슬로프 현황을 요청할때는 메인 API에서 저장한 DB로 부터 데이터를 표시했습니다.
여러 시행착오 끝에 가장 빠르게, 가장 효과적인 구조로 크롤링을 하는 방법에 대해 작성했습니다.
실행 환경#
로컬에서 테스트용으로 사용되는 환경은 다음과 같습니다.
1
2
3
|
OS : MacOS Monterey (Mac Studio)
Versions : pipenv(pyenv python3.10.3), FastAPI(0.84)
Etc : Talend Api Tester, Mysql(AWS RDS)
|
requests
vs grequests
vs aiohttp
무엇이 제일 빠른가?#
왜 빨라야 할까?#
[문제점]
전국의 리조트 15곳을 순차적으로 크롤링하기 떄문에 유저가 업데이트된 정보를 요청하거나 문제가 생겼을 때 강제로 다시 크롤링 로직을 도는데 결과를 반환받기 까지 수초가 걸렸습니다.
크롤링 -> db insert -> 다음 리조트 크롤링 -> ... 반복 -> 메인 서버에서 결과값 반환
[해결방안]
aiohttp를 이용해서 비동기 방식으로 변경하고 DB에 insert 할때도 크롤링 결과를 모아 bulk_update를 이용하는 방식으로 변경하여 걸리는 시간을 0.5초 이내로 낮출 수 있었습니다.
비동기 크롤링 -> bulk_update로 한번에 insert -> 메인 서버에서 결과값 반환
[테스트방법]
requests
, grequests
, aiohttp
를 로컬에서 돌아가는 테스트 서버에 100번씩 요청을 보냈을때 속도를 비교해보고 가장 빠른 방법을 선택했습니다.
테스트로 사용할 간단한 FastAPI를 실행시켰습니다. 요청을 0.1초 후에 리턴합니다.
1
2
3
4
5
6
|
# url : http://127.0.0.1:8002/test/{num}
@app.get("/test/{num}")
def get_test(num):
import time
time.sleep(0.1)
return {"num": num}
|
requests
테스트#
테스틀 위해서 앞서 만든 url로 get 요청을 100번 보내서 평균을 내어 1개 요청에 몇초가 소요됬는지 확인해봅니다.
Install requests
1
|
pipenv install requests
|
code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
def request_test():
import requests
import time
start = time.time()
cnt = 100
for i in range(1, cnt + 1):
url = f"http://127.0.0.1:8002/test/{i}"
res = requests.get(url)
total = round(time.time() - start, 4)
average = round(total/cnt, 4)
print(f"Total -> {total}sec")
print(f"Avg -> {average}sec")
request_test()
|
result
1
2
|
Total -> 11.1368sec
Avg -> 0.1114sec
|
grequests
테스트#
grequests 는 Gevent를 이용하여 비동기 http request를 할 수 있는 라이브러리입니다. https://github.com/spyoungtech/grequests 마찬가지로 100번의 요청을 보내봤습니다.
install grequests
1
|
pipenv install grequests
|
code
1
2
3
4
5
6
7
8
9
10
11
12
13
|
def grequest_test():
import grequests
import time
cnt = 100
urls = [f"http://127.0.0.1:8002/test/{i}" for i in range(1, cnt+1)]
start = time.time()
rs = (grequests.get(u) for u in urls)
total = round(time.time() - start, 4)
average = round(total/cnt, 4)
print(f"Total -> {total}sec")
print(f"Avg -> {average}sec")
grequest_test()
|
result
1
2
|
Total -> 0.4132sec
Avg -> 0.0041sec
|
aiohttp
테스트#
비동기 크롤링에 가장 보편적으로 많이 사용되는 라이브러리 입니다. 100번의 요청을 보냅니다.
install aiohttp
100번의 요청을 보내봅니다.
code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import asyncio
import time
from aiohttp import ClientSession
async def aiohttp_test(url):
async with ClientSession() as session:
async with session.get(url) as response:
return await response.read()
cnt = 100
start = time.time()
loop = asyncio.get_event_loop()
coroutines = [aiohttp_test(f'http://127.0.0.1:8002/test/{i}') for i in range(cnt)]
results = loop.run_until_complete(asyncio.gather(*coroutines))
total = round(time.time() - start, 4)
average = round(total/cnt, 4)
print(f"Total -> {total}sec")
print(f"Avg -> {average}sec")
|
result
1
2
|
Total -> 0.3705sec
Avg -> 0.0037sec
|
속도 비교 결과#
1개의 요청을 처리하는데 걸리는 평균 시간
- requests : 0.1114sec
- grequests : 0.0041sec
- aiohttp : 0.0037sec 🏆
라이브러리 선택#
여러번 계속 테스트 해봐도 aiohttp
가 가장 빨랐습니다. 내부망에서 왔다갔다 하는거라 큰 차이가 나진 않았던 것 같습니다. 내부망이 아닌 request 테스트 사이트 requestcatcher에서 테스트해본 결과도 동일하였습니다.
0.1초후에 응답하는 api에 100번의 요청을 보냈을 때 싱글스레드(requests)는 평균 0.1114sec 초로 한개의 요청을 끝내고 다음 요청을 진행하는 것을 볼 수 있었습니다. 비동기를 이용했을때는 평균 약 0.004초가 걸렸습니다. 제 환경에서는 약 25개의 요청을 동시에 처리할 수 있다는 점을 알 수 있었습니다.
크롤링하는 서버의 성능에 따라 인터넷 속도나 얼만큼 많은 쓰레드를 사용할 수 있는가에 따라 속도 차이가 좀 더 나겠지만, 전국 스키장의 수가 20개가 되지 않으므로 비동기를 이용하여 모든 스키장을 크롤링을 동시에 진행할 것이고 최소 시간은 가장 크롤링이 오래걸리는 스키장이 될 것으로 예상했습니다.
FastAPI
+ aiohttp
로 크롤링하기#
FastAPI 선택 이유#
크롤링 서버는 메인서버에서 분리되어 일정 시간마다 크롤링 > DB Insert
만 하는 아주 가볍게 돌아가는 서버이기 떄문에 Django
나 Flask
보다 가벼운 FastAPI
를 선택했습니다. 간혹 오류나 크롤링 실패로 인해서 관리자나 프론트에서 유저가 다시 크롤링하라는 요청을 보내기 위해 get 요청을 받을 수 있게 api로 진행했습니다.
프로젝트 트리 구조#
스키장별로 제공하는 홈페이지가 모두 달랐습니다. 정적페이지로 제공하는지, API가 있는지 등 스키장마다 모두 다른 형태로 리턴받아서 구조와 클린코드를 어떻게 짜야할지 고민이 많았습니다. 같은 형식이 아닌 다양한 여러 페이지를 가장 빠르게 크롤링하고 어떤식으로 구조를 잡았는지, DB에는 어떻게 효과적으로 Insert 했는지 과정을 소개합니다.
FastAPI 프로젝트 tree
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
.
├── Dockerfile
├── docker-compose.yml
├── .env
├── requirements.txt
├── scripts
│ └── db_to_orm.sh
└── src
├── config (RDS 연결)
│ ├── __init__.py
│ └── database.py
├── constant.py (상수값 저장)
├── crawler (크롤링)
│ ├── __init__.py
│ ├── crawler.py
├── main.py (api)
├── model (DB모델)
│ ├── __init__.py
│ └── models.py
├── service (DB insert)
│ ├── __init__.py
│ └── slope_time_service.py
└── utils (결과 Discord 전송)
└── webhook.py
|
main.py
에서 get 요청을 처리합니다. 크롤링과 관련된 내용은 crawler.py
에 저장하고 크롤링에 필요한 상수값 (resort_code, resort_name 등)은 DB에서 불러와도 되지만 상수로 저장해놓고 사용하는게 더 빠르기 때문에 constant.py
에 저장했습니다.
처리순서는 다음과 같습니다.
1
2
3
4
5
6
7
8
|
1. 크롤링 요청
2. aiohttp client를 생성하고
3. 코루틴 리스트 배열을 만들고
4. 비동기로 모든 리조트를 크롤링 해서
5. aiohttp client를 종료 후
6. 결과값을 배열에 모아서
7. bulk_update로 DB에 저장하고
8. 크롤링 결과를 Discord에 푸쉬
|
cralwer > crawlerV3.py#
여러번 테스트 결과 aiohttp 적용이 가장 빨랐다. (좌측:requests, 우측:aiohttp)
class SingletonAiohttp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
class SingletonAiohttp:
sem: Optional[asyncio.Semaphore] = None
aiohttp_client: Optional[aiohttp.ClientSession] = None
@classmethod
def get_aiohttp_client(cls) -> aiohttp.ClientSession:
if cls.aiohttp_client is None:
timeout = aiohttp.ClientTimeout(total=5)
connector = aiohttp.TCPConnector(
family=AF_INET, limit_per_host=SIZE_POOL_AIOHTTP, ssl=False
)
cls.aiohttp_client = aiohttp.ClientSession(
timeout=timeout, connector=connector, trust_env=True
)
return cls.aiohttp_client
@classmethod
async def close_aiohttp_client(cls) -> None:
if cls.aiohttp_client:
await cls.aiohttp_client.close()
cls.aiohttp_client = None
@classmethod
async def crawl(cls, resortName: ResortName) -> Any:
client = cls.get_aiohttp_client()
resort = Resort()
try:
async with client.get(url=FakeDB[resortName]["url"]) as response:
if response.status != 200:
return {"ERROR OCCURED" + str(await response.text())}
html = await response.text()
if resortName == ResortName.jisan:
result = resort.jisan(html)
# ...
elif resortName == ResortName.otwo:
result = resort.otwo(html)
return result
except Exception as e:
return {"ERROR": e}
...
async def on_start_up() -> None:
fastAPI_logger.info("on_start_up")
SingletonAiohttp.get_aiohttp_client()
async def on_shutdown() -> None:
fastAPI_logger.info("on_shutdown")
await SingletonAiohttp.close_aiohttp_client()
|
class Utility
스키장에 따라서 어떤 곳은 슬로프명이 x축에, 어떤 곳은 y축에 있었고 어떤 곳은 슬로프 오픈 정보와 슬로프 난이도 길이 등을 같이 표시하는 곳이 있었습니다. 이런 데이터들을 규격화 하는 과정이 필요했고 크롤링 하면 무조건 2차원 배열로 [[구분, 슬로프1, 슬로프2], [시간,O,X,..], .. ,[시간,O,X,..]]
만들어서 크롤링이 성공하면 inSuccess 함수에서 DB에 넣는 형식으로 리턴해주었습니다. 크롤링하다가 Exception 발생시에는 해당 리조트의 크롤링 여부가 실패했음을 inFailed 함수가 실행됩니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
class Utiliy:
def inSuccess(self, resort_name, array):
resortContstantDB = ResortConstant[resort_name]
# ... DB 형식에 맞게 수정 array 인자는 무조건 2차원배열로 들어온다
return {
"fetch_status": {
"resort_code": resortContstantDB["resort_code"],
"fetch_status": "O",
},
"slope_open_yn": result,
}
def inFailed(self, resort_name):
resortContstantDB = ResortConstant[resort_name]
return {
"fetch_status": {
"resort_code": resortContstantDB["resort_code"],
"fetch_status": "X",
},
"slope_open_yn": [],
}
|
class Reosrt
리조트별로 크롤링 해서 2차원 배열로 리턴합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
class Resort(Utiliy):
# API로 불러올 경우 JSON
def wellyhilly(self, res):
resort_name = ResortName.duckyousan
response = json.loads(res)
# ...
try:
datas = response["data"]
# ... 형식에 맞게 수정
return self.makeDbInputData(resort_name, array)
except Exception as e:
print("ERROR : ", e)
return self.fetchFailed(resort_name)
# 정적페이지인 경우
def otwo(self, html):
resort_name = ResortName.otwo
# ...
try:
html = BeautifulSoup(html, "lxml")
# ... 크롤링
return self.makeDbInputData(resort_name, array)
# return self.fetchFailed(resort_name)
except Exception as e:
print("ERROR : ", e)
return self.fetchFailed(resort_name)
|
config > database.py#
.env에서 rds 주소를 불러와서 db연결 세션을 만듭니다.
1
2
3
4
5
6
7
8
9
10
11
|
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from os import getenv
SQLALCHEMY_DATABASE_URL = getenv("DATABASE_URL")
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
|
service > slope_time_service.py
성공, 실패 여부를 모아놓은 fetch_status_list
와 슬로프 현황이 담긴 slope_open_yn_list
를 db에 넣어줍니다. bulk_update를 이용하면 한번의 커넥션으로 데이터들을 전송할 수 있습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
from sqlalchemy.orm import Session, aliased
from sqlalchemy import select
from sqlalchemy import and_
from ..config.database import SessionLocal, engine
from contextlib import contextmanager
from fastapi.concurrency import contextmanager_in_threadpool
from ..model import models
models.Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()4
try:
yield db
finally:
db.close()4
async def update_slope_time_bulk(fetch_status_list: list, slope_open_yn_list: list):
async with contextmanager_in_threadpool(contextmanager(get_db)()) as db:
try:
db.bulk_update_mappings(models.SkiResort, fetch_status_list)
db.bulk_update_mappings(models.SlopeTime, slope_open_yn_list)
db.commit()
except Exception as e:
print("update_slope_time_bulk Exeption :", e)
db.rollback()
raise
|
main.py#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
app = FastAPI(docs_url="/", on_startup=[on_start_up], on_shutdown=[on_shutdown])
@app.get("/slope/update")
async def fasterUpdateSlope() -> Dict[str, int]:
start = time.time()
# Async
async_calls: List[Coroutine[Any, Any, Any]] = list() # store all async operations
async_calls.append(SingletonAiohttp.crawl(ResortName.jisan))
# ...
async_calls.append(SingletonAiohttp.crawl(ResortName.duckyousan))
results = await asyncio.gather(*async_calls) # wait for all async operations
# Make Data
....
# DB Push & Discord
try:
await slope_time_service.update_slope_time_bulk(
fetch_status_list, slope_open_yn_list
)
# ...
if not DEBUG:
discord_webhook(discord_title, discord_msg)
return {
"fetch_status_list": fetch_status_list,
"slope_open_yn_list": slope_open_yn_list,
}
except Exception:
# ...
if not DEBUG:
discord_webhook(discord_title, discord_msg + "`")
raise HTTPException(status_code=500, detail="Server error")
|
8개 정도 리조트를 크롤링 했을때 0.3초 ~ 0.4초
정도 걸렸습니다. RDS를 아직 저사양 인스턴스를 사용해서 db insert 시간이 더 길었습니다.
슬로프 오픈 현황 (open_yn)
크롤링 성공 여부 (실패시 마지막 성공 데이터를 보여줍니다)
디스코드로 결과 보내기#
결과를 팀원들에게 실시간으로 공유하고 오류가 있을때는 @channel
과 함께 디스코드로 푸쉬를 보냅니다.d
결과는 discord로 받습니다. Rds 성능이 낮아서 DB Insert 속도가 조금 느립니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import requests
from os import getenv
from dotenv import load_dotenv
def discord_webhook(title, msg):
load_dotenv()
url = getenv("DISCORD_CRAWLER_MONITOR_URL")
data = {
"content": msg,
"username": title,
}
requests.post(url, json=data)
|
CSR 페이지 공략#
CSR로 만들어져서 브라우져가 렌더링하는 페이지일 경우에 최대한 셀레늄을 사용하지 않으려고 선택한 방법입니다. (…계속)