1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
|
from datetime import datetime, timezone
from operator import itemgetter
import time
import os
import requests
from constant import *
def utc_to_local(utc_dt):
# 유튜브 API 로 받아오면 미국 시간으로 받아와짐
# 미국 > 한국으로 변경
return utc_dt.replace(tzinfo=timezone.utc).astimezone(tz=None)
def sorter(list, value):
# [{},{},..,{}] 리스트에서 딕셔너리의 밸류를 이용해 정렬
return sorted(list, key=itemgetter(value), reverse=True)
def listDivider(arr, n):
# arr배열을 n개씩 나누어 배열로 감싸 리턴합니다.
return [arr[i: i + n] for i in range(0, len(arr), n)]
class Printer:
# 1. 화면에 출력
# 2. 로컬에 저장
# 3. 시놀로지채팅에 전송
# 위 3가지를 담당하는 클래스
def __init__(self):
# 파일 저장할 디렉토리 생성
today = time.strftime('%Y-%m-%d', time.localtime(time.time()))
start = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
current = os.getcwd()
path = f'{current}/{today}'
try:
if not os.path.exists(path):
os.makedirs(path)
except OSError:
print("Error: Cannot create the directory {}".format(path))
self.fp = open(f'{path}/{start}.txt', 'w')
def saveOnLocal(self, msg):
# 출력하고 파일에 저장합니다.
self.fp.write(f'{msg}\n')
print(msg)
def sendToChat(self, msg):
# 시놀로지채팅에 보내고, 출력하고, 로컬에 저장합니다.
self.saveOnLocal(msg)
# 텍스트가 너무 길면 시놀로지챗에서 410 에러를 반환한다. n 줄씩 끊어서 배열로 리턴 (\n으로)
msgs_array = listDivider(msg.split('\n'), SYNOLOGYCHAT_LINE_SPLIT)
# 시놀로지에서 이쁘게 표시되게 변경합니다.
for msgs in msgs_array:
text = '\n'.join(msgs)
text = text.replace(SMALL_LINE, SYNOLOGY_LINE)
text = text.replace("\t",'')
text = text.replace(" "," ")
text = text.replace("닉네임","\n닉네임")
text = text.replace("| @","\n@")
# 해당 텍스트를 시놀로지 챗봇으로 보낼 수 있는 API를 만들어놓았습니다.
requests.get(SYNOLOGYAPI_URL, data={"msg":text})
time.sleep(SYNOLOGYAPI_REQUEST_PER_TIME)
def close(self):
self.fp.close()
class Youtube:
apikey = YOUTUBEAPI_KEY
videoId = ''
videoInfo = {
"title": '', # 제목
"description": '', # 설명
"viewCount": 0, # 조회수
"likeCount": 0, # 좋아요
"commentCount": 0, # 댓글수
}
comments = [{
"authorDisplayName": '', # 닉네임
"textDisplay": '', # 댓글
"publisedAt": '', # 작성일
"likeCount": '', # 좋아요
"diffTime": '', # 다음글 작성시간의 차이
"authorChannelId": '', # 유저 채널 아이디 (고유값)
"totalReplyCount": '', # 답글 수
}]
def __init__(self):
self.printer = Printer()
self.greeting()
# 유저한테 동영상 ID를 받아오고
self.videoId = self.getVideoId()
# 동영상 정보를 출력하고
self.videoInfo = self.getVideoInfo()
# 동영상의 댓글을 저장합니다.
self.comments = []
self.comments = self.getVideoComments()
# 저장된 댓글을 분석해서 몇분만에 다음 댓글이 달렸는지 저장합니다.
self.addDiffTime(self.comments)
self.start()
def start(self):
while True:
self.printer.saveOnLocal(f'\n\n{LINE}')
self.printer.saveOnLocal("유튜브 댓글 조회 프로그램\nby.배돌이의당구생활 채PD")
self.printer.saveOnLocal(f'{LINE}')
self.printer.saveOnLocal(" 0. 종료")
self.printer.saveOnLocal(" 1. 모든 댓글 조회")
self.printer.saveOnLocal(" 2. 가장 많은 좋아요 댓글 조회")
self.printer.saveOnLocal(" 3. 가장 많은 답글수 댓글 조회")
self.printer.saveOnLocal(" 4. 가장 많이 작성한 사람 조회")
self.printer.saveOnLocal(" 5. 가장 오래 버틴 댓글 조회")
self.printer.saveOnLocal(" 6. 닉네임 조회")
self.printer.saveOnLocal(" 7. 댓글 버티기 이벤트 시작")
self.printer.saveOnLocal(" 8. 다른 영상 조회하기")
self.printer.saveOnLocal(f'{LINE}')
command = input("번호를 입력해주세요 : ")
self.printer.saveOnLocal(f' > 입력된 번호 : {command}')
if command == "0":
# 종료
self.printer.saveOnLocal("프로그램을 종료합니다.")
self.printer.close()
break
elif command == "1":
# 모든 댓글 조회
os.system('clear')
self.printComments(self.comments)
elif command == "2":
# 가장 많은 좋아요
os.system('clear')
self.printMostLiker()
elif command == "3":
# 가장 많은 답글수
os.system('clear')
self.printMostReply()
elif command == "4":
# 가장 많이 작성한 사람
os.system('clear')
self.printMostCommenter()
elif command == "5":
# 가장 오래 버틴 댓글
os.system('clear')
self.printMostDiffTime()
elif command == "6":
# 유저 검색
os.system('clear')
username = input("검색할 닉네임을 적어주세요 : ")
self.printUsersComment(username)
elif command == "7":
# 버티기 이벤트 시작
os.system('clear')
print(LINE)
print("🚨 댓글 고정이 없는지 확인해주세요. 되어있다면 고정해제 해주세요.")
print("🚨 이벤트가 종료될 때 까지 프로그램은 계속 진행됩니다.")
print(LINE)
stop = input(" > 몇 분 버티기를 하실건지 입력해주세요 : ")
os.system('clear')
self.startDiffTimeEvent(stop)
elif command == "8":
# 다른 동영상 조회
os.system('clear')
self.__init__()
else:
os.system('clear')
def greeting(self):
text = f'\n{LINE}\n✋ 프로그램이 시작됩니다. {time.strftime("%y년 %m월 %d일 %H:%M:%S")}\n'
text += f'> By.배돌이의당구생활 채PD\n> 영상 댓글 이벤트\n{LINE}'
self.printer.saveOnLocal(text)
def youtubeAPIExceed(self):
# 유튜브 API가 일일 한도가 정해져 있음.
# 일일한도를 초과하면 다른 키로 변경합니다.
self.printer.sendToChat(f'🚨 유튜브 API 요청 한도 초과. {YOUTUBEAPI_RESEND_TIMEOUT}초 후 다시 시도합니다.')
if self.apikey == YOUTUBEAPI_KEY:
self.apikey = YOUTUBEAPI_SUB_KEY
elif self.apikey == YOUTUBEAPI_SUB_KEY:
self.apikey = YOUTUBEAPI_KEY
time.sleep(YOUTUBEAPI_RESEND_TIMEOUT)
def youtubeNotFound(self):
# 유튜브 url을 찾지 못했을 경우
print(f'🚨 해당 유튜브 영상 URL을 확인할 수 없습니다. 다시 시도해주세요.\n{LINE}')
self.__init__()
def getVideoId(self):
while True:
videoId = input("1️⃣ 영상 URL을 입력해주세요 : ")
# 유튜브 URL 인지 확인
try:
if 'youtu' not in videoId:
raise Exception
if 'watch?v=' in videoId:
videoId = videoId.split("=")[1]
else:
videoId = videoId.split("/")[-1]
return videoId
except Exception as e:
print("🚨 해당 영상을 확인할 수 없습니다. 다시 입력해주세요.", e)
print(LINE)
def getVideoInfo(self):
# 유저로부터 받은 동영상url이 정상적인지 확인하고 정상인 경우 동영상 정보를 리턴합니다.
res = requests.get(f"{YOUTUBEAPI_VIDEOS_URL}?key={self.apikey}&part=snippet,statistics&id={self.videoId}")
if res.status_code == 200:
data = res.json()
items = data["items"]
# url이 잘못되어도 status 200값과 함께 items는 빈 배열을 리턴합니다.
if len(items) < 1 :
self.youtubeNotFound()
title = items[0]["snippet"]["title"]
description = items[0]["snippet"]["description"].replace("\n", " ").replace("<br>"," ")
if len(description) > 25:
description = f'{description[:25]}...'
commentCount = int(items[0]["statistics"]["commentCount"])
likeCount = int(items[0]["statistics"]["likeCount"])
viewCount = int(items[0]["statistics"]["viewCount"])
msg = f'✅ 영상이 확인되었습니다.\n'
msg += f'> 제목: {title}\n'
msg += f'> 설명: {description}...\n'
msg += f"> 조회수 {format(viewCount, ',')} | 좋아요 {format(likeCount, ',')} | 댓글수 {format(commentCount, ',')}\n"
msg += LINE
self.printer.saveOnLocal(msg)
return {
"title": title,
"description": description,
"viewCount": viewCount,
"likeCount": likeCount,
"commentCount": commentCount,
}
else:
# 200을 리턴받지 않으면 유튜브 api 일일한도 초과로 403을 리턴받습니다.
self.youtubeAPIExceed()
def getVideoComments(self):
self.printer.saveOnLocal(f"2️⃣ 댓글을 가져옵니다.")
url = f"{YOUTUBEAPI_COMMENTTHREADS_URL}?key={self.apikey}&maxResults={MAXRESULTS}&part=snippet,replies&videoId={self.videoId}"
# 동영상의 댓글을 불러옵니다.
self.getComments(url, 1)
self.comments = sorted(self.comments, key=itemgetter('publishedAt'))
self.printer.saveOnLocal(f'✅ 모든 댓글이 확인되었습니다.\n{LINE}')
return self.comments
def getComments(self, url, cnt):
# 1회 요청에 100개까지 받아올 수 있고 100개가 초과하는 댓글의 동영상은 다음 페이지 토큰이 주어집니다.
# 재귀적으로 다음 페이지 토큰이 없을때 까지 실행됩니다.
percent = cnt * MAXRESULTS / self.videoInfo["commentCount"] * 100
if percent > 100:
percent = 100
currCnt = cnt * MAXRESULTS
self.printer.saveOnLocal(f"- ({percent:6.2f}%) {format(currCnt,',')}개 까지 추출중...")
response = requests.get(url).json()
try:
for item in response["items"]:
self.comments.append(self.validateComment(item))
except Exception as e:
# Exception 발생은 api 일일조회를 초과했을 때 나타납니다.
# api 키를 바꾸고 현재 댓글을 다시 조회합니다.
self.youtubeAPIExceed()
self.getComments(url, cnt)
if "nextPageToken" in response.keys():
# 다음 댓글이 있으면 재귀로 다시 불러옵니다.
nextPageToken = str(response["nextPageToken"])
self.getComments(f"{url}&pageToken={nextPageToken}", cnt + 1)
def validateComment(self, item):
# api로 받은 json 에서 필요한 값만 저장합니다.
publisedAt = item["snippet"]["topLevelComment"]["snippet"]["publishedAt"]
publisedAt = datetime.strptime(publisedAt, "%Y-%m-%dT%H:%M:%SZ")
publisedAt = datetime.strftime(utc_to_local(publisedAt), "%y-%m-%d %H:%M:%S")
comment = {
"publishedAt" : publisedAt,
"authorDisplayName" : item["snippet"]["topLevelComment"]["snippet"]["authorDisplayName"],
# <a href="about:invalid#zCSafez"></a>는 PC에서 유튜브에만 존재하는 이모티콘이 이러게 나옵니다
"textDisplay" : item["snippet"]["topLevelComment"]["snippet"]["textDisplay"].replace("<br>"," ").replace("""," ").replace('<a href="about:invalid#zCSafez"></a>','[이모티콘]'),
"likeCount" : item["snippet"]["topLevelComment"]["snippet"]["likeCount"],
"authorChannelId" : f'@{item["snippet"]["topLevelComment"]["snippet"]["authorChannelId"]["value"]}',
"totalReplyCount" : item["snippet"]["totalReplyCount"]
}
return comment
def addDiffTime(self, comments):
# 배열을 돌면서 현재댓글과 다음댓글의 시간차이를 저장합니다.
# 마지막 댓글은 현재시간과 시간차이를 저장합니다.
lastCommentDiffTime = (datetime.now() - datetime.strptime(comments[-1]["publishedAt"], "%y-%m-%d %H:%M:%S")).total_seconds()/60
comments[-1].update({"diffTime": lastCommentDiffTime})
for idx in range(0, len(comments)-1):
curr = datetime.strptime(comments[idx]["publishedAt"], "%y-%m-%d %H:%M:%S")
next = datetime.strptime(comments[idx+1]["publishedAt"], "%y-%m-%d %H:%M:%S")
diffTime = (next-curr).total_seconds()/60
# 삭제되는 댓글이 있을 수 있어서 음수가 나오면 0으로 표시합니다.
if diffTime < 0:
diffTime = 0.00
comments[idx].update({"diffTime": diffTime})
def printMostCommenter(self):
# 가장 많이 댓글 작성한 사람을 MOST_CNT번 출력합니다.
userDict = {}
for comment in self.comments:
username = f'{comment["authorDisplayName"]} | {comment["authorChannelId"]}'
if username in userDict:
userDict[username] += 1
else:
userDict[username] = 1
mostCommenter = sorted(userDict.items(), key=lambda x:x[1], reverse=True)
msg = f'\n{LINE}\n👑 가장 댓글 많이 작성한 사람 (총 {format(len(mostCommenter),",")}명)\n'
if len(mostCommenter) > MOST_CNT:
for idx in range(0, MOST_CNT):
msg += f'{mostCommenter[idx][1]}회 | {mostCommenter[idx][0]}\n'
else:
for idx in range(0, len(mostCommenter)):
msg += f'{mostCommenter[idx][1]}회 | {mostCommenter[idx][0]}\n'
self.printer.saveOnLocal(msg)
def printMostLiker(self):
# 가장 많이 좋아요 받은 댓글을 MOST_CNT번 출력합니다.
mostLiker = []
for comment in sorter(self.comments, 'likeCount'):
if comment['likeCount'] > 0:
mostLiker.append(comment)
else:
mostLiker = mostLiker[:MOST_CNT]
self.printComments(mostLiker, f'{LINE}\n👑 가장 좋아요 많이 받은 댓글')
def printMostReply(self):
# 가장 많이 답글 받은 댓글을 MOST_CNT번 출력합니다.
mostReply = []
for comment in sorter(self.comments, 'totalReplyCount'):
if comment['totalReplyCount'] > 0:
mostReply.append(comment)
else:
mostReply = mostReply[:MOST_CNT]
self.printComments(mostReply, f'{LINE}\n👑 가장 많이 답글 달린 댓글')
def printMostDiffTime(self):
# 가장 많이 버틴 댓글을 MOST_CNT번 출력합니다.
mostDiffTime = sorter(self.comments, 'diffTime')
self.printComments(mostDiffTime[:MOST_CNT], f'{LINE}\n👑 가장 오래버틴 댓글 (삭제된 댓글은 집계하지 않습니다.)')
def printUsersComment(self, username):
# 특정 유저가 작성한 댓글을 모두 출력합니다.
userComment = []
for comment in self.comments:
if comment["authorDisplayName"] == username:
userComment.append(comment)
if len(userComment) == 0:
self.printer.saveOnLocal(f'\n{LINE}\n👑 [{username}]님이 작성한 댓글이 없습니다\n')
else:
mostLike = sorter(userComment, 'likeCount')[0]["likeCount"]
mostReply = sorter(userComment, 'totalReplyCount')[0]["totalReplyCount"]
mostDiffTime = sorter(userComment, 'diffTime')[0]["diffTime"]
self.printComments(userComment, f'{LINE}\n👑 [{username}]이 작성한 댓글 (삭제된 댓글은 집계하지 않습니다.)')
self.printer.saveOnLocal(f'\n총 [{format(len(userComment), ",")}]개의 댓글을 작성하셨습니다.')
self.printer.saveOnLocal(f' 💚 [{username}]님이 받은 가장 많은 좋아요 갯수는 [{mostLike}]개 입니다')
self.printer.saveOnLocal(f' 📩 [{username}]님이 받은 가장 많은 답글 갯수는 [{mostReply}]개 입니다')
self.printer.saveOnLocal(f' ⏰ [{username}]님이 가장 오래 버틴 댓글은 [{mostDiffTime:.2f}]분 입니다')
def printComments(self, comments, type=''):
# 댓글들을 출력합니다.
self.printer.saveOnLocal(f"\n{type}\n{SMALL_LINE}")
msg = ''
for comment in comments:
msg += self.commentPrettier(comment)
msg += '\n'
self.printer.saveOnLocal(msg)
def commentPrettier(self, comment):
# 댓글을 출력할때 포맷을 정해줍니다.
msg = f'{comment["publishedAt"]}\t닉네임: {comment["authorDisplayName"]} | {comment["authorChannelId"]}\n\t\t\t댓글: {comment["textDisplay"]:.33s}\n\t\t\t좋아요: {comment["likeCount"]} | 답글수: {comment["totalReplyCount"]} | 버틴시간: {comment["diffTime"]:.2f}분\n{SMALL_LINE}'
return msg
def startDiffTimeEvent(self, stop):
# 앞으로 실시간으로 댓글을 불러옵니다
# 프로그램이 추출 이후 삭제한 댓글들도 저장됩니다
msg = f'{LINE}\n✋ *{stop}분 버티기 이벤트 시작*\n> 제목: {self.videoInfo["title"]}\n> 지금부터 {YOUTUBEAPI_REQUEST_PER_TIME}초마다 새로운 댓글을 확인합니다.\n{LINE}\n\n{SMALL_LINE}'
self.printer.sendToChat(msg)
url = f"{YOUTUBEAPI_COMMENTTHREADS_URL}?key={self.apikey}&maxResults={LIVE_MAXRESULTS}&part=snippet,replies&videoId={self.videoId}"
# 이미 추출한 댓글의 마지막 댓글은 버틴시간이 없기 때문에 -1 까지 추출합니다.
time.sleep(1)
self.printer.sendToChat(self.commentPrettier(self.comments[-2]))
while True:
try:
# 이미 추출한 댓글의 publishedAt, authorChannelId 를 합쳐서 고유 key를 생성합니다
prev = self.comments
keys = []
for p in prev:
key = f'{p["publishedAt"]}{p["authorChannelId"]}'
keys.append(key)
# comments 리스트에 받아놓는데 에러 발생하면 유튜브 api 키 변경
comments = []
response = requests.get(url).json()
try:
for item in response["items"]:
comments.append(self.validateComment(item))
except Exception as e:
print("ERROR", e)
self.youtubeAPIExceed()
# 새로운 댓글이 나오면 New 배열에 저장합니다.
found = False
new = []
for comment in comments:
key = f'{comment["publishedAt"]}{comment["authorChannelId"]}'
if key not in keys:
new.append(comment)
self.comments.append(comment)
found = True
# 새로운 댓글 -1 까지 추출 (끝은 diffTime이 없기 떄문)
winner = []
if found:
self.addDiffTime(self.comments)
for comment in self.comments[len(self.comments)-len(new)-1:-1]:
if comment["diffTime"] > int(stop):
winner.append(comment)
self.printer.sendToChat(self.commentPrettier(comment))
# 우승자가 있으면 프로그램 종료
if len(winner) > 0:
msg = (f"\n@channel\n👑 {stop}분 버티기 당첨자가 나왔습니다.\n")
msg += (f'{SMALL_LINE}\n{self.commentPrettier(winner[0])}\n')
msg += ("이벤트를 종료합니다.")
self.printer.sendToChat(msg)
break
# 계속해서 리퀘스트를 보내면 api 일일횟수가 금방 달성하기 떄문에 sleep
# 10초가 적당합니다.
time.sleep(YOUTUBEAPI_REQUEST_PER_TIME)
except Exception as e:
# 에러가 나면 api 일일횟수 초과 api 키 변경
print("ERROR", e)
self.youtubeAPIExceed()
os.system('clear')
youtube = Youtube()
|