들어가며

유튜브 채널 팩토리 님께서 진행해주신 레전드 컨텐츠 이벤트가 있습니다. (아마도 팩토리님은 개발자일지도)

아직도 안끝나고 댓글이 150만개가 달려서 세계10위에 들었다고 합니다

아직도 안끝나고 댓글이 150만개가 달려서 세계10위에 들었다고 합니다


배돌이의 당구생활 유튜브에서도 비슷한 다양한 이벤트들을 진행중입니다.

  1. 라이브에서 실시간 추첨 (참고글)
  2. 컨텐츠에 댓글달고 오랫동안 버틴사람
  3. 컨텐츠 댓글중 좋아요가 가장 많은 사람
  4. 컨텐츠 댓글중 답글이 제일 많은 사람
  5. 컨텐츠에 댓글을 가장 많이 작성한 사람

위 뿐만 아니라 여러가지 이벤트들을 진행&기획중으로 프로그램을 제작하게 되었습니다.


코드 & 코드설명

constant.py

상수를 모아놓은 파일입니다.

constant.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19

LINE = "=" * 28
SMALL_LINE = '-' * 65
SYNOLOGY_LINE = '-' * 38

YOUTUBEAPI_KEY = ""
YOUTUBEAPI_SUB_KEY = ""
YOUTUBEAPI_VIDEOS_URL = "https://www.googleapis.com/youtube/v3/videos"
YOUTUBEAPI_COMMENTTHREADS_URL = "https://www.googleapis.com/youtube/v3/commentThreads"
YOUTUBEAPI_REQUEST_PER_TIME = 10
YOUTUBEAPI_RESEND_TIMEOUT = 3
MOST_CNT = 5
MAXRESULTS = 100
LIVE_MAXRESULTS = 30

# SYNOLOGY CHAT에 보낼 경우
SYNOLOGYAPI_URL = ""    # 메세지를 처리하는 api를 따로 만들어야합니다.
SYNOLOGYAPI_REQUEST_PER_TIME = 0.5
SYNOLOGYCHAT_LINE_SPLIT = 5

crwaler.py

프로그램 코어입니다. 설명은 주석으로 달아놓았습니다.

crwaler.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
from datetime import datetime, timezone
from operator import itemgetter
import time
import os
import requests
from constant import *


def utc_to_local(utc_dt):
    # 유튜브 API 로 받아오면 미국 시간으로 받아와짐
    # 미국 > 한국으로 변경
    return utc_dt.replace(tzinfo=timezone.utc).astimezone(tz=None)

def sorter(list, value):
    # [{},{},..,{}] 리스트에서 딕셔너리의 밸류를 이용해 정렬
    return sorted(list, key=itemgetter(value), reverse=True)

def listDivider(arr, n):
    # arr배열을 n개씩 나누어 배열로 감싸 리턴합니다.
    return [arr[i: i + n] for i in range(0, len(arr), n)]

class Printer:
    # 1. 화면에 출력
    # 2. 로컬에 저장
    # 3. 시놀로지채팅에 전송
    # 위 3가지를 담당하는 클래스

    def __init__(self):
        # 파일 저장할 디렉토리 생성
        today = time.strftime('%Y-%m-%d', time.localtime(time.time()))
        start = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
        current = os.getcwd()
        path = f'{current}/{today}'
        try:
            if not os.path.exists(path):
                os.makedirs(path)
        except OSError:
            print("Error: Cannot create the directory {}".format(path))
        self.fp = open(f'{path}/{start}.txt', 'w')


    def saveOnLocal(self, msg):
        # 출력하고 파일에 저장합니다.
        self.fp.write(f'{msg}\n')
        print(msg)

    def sendToChat(self, msg):
        # 시놀로지채팅에 보내고, 출력하고, 로컬에 저장합니다.
        self.saveOnLocal(msg)
        # 텍스트가 너무 길면 시놀로지챗에서 410 에러를 반환한다. n 줄씩 끊어서 배열로 리턴 (\n으로)
        msgs_array = listDivider(msg.split('\n'), SYNOLOGYCHAT_LINE_SPLIT)
        # 시놀로지에서 이쁘게 표시되게 변경합니다.
        for msgs in msgs_array:
            text = '\n'.join(msgs)
            text = text.replace(SMALL_LINE, SYNOLOGY_LINE)
            text = text.replace("\t",'')
            text = text.replace("  "," ")
            text = text.replace("닉네임","\n닉네임")
            text = text.replace("| @","\n@")
            # 해당 텍스트를 시놀로지 챗봇으로 보낼 수 있는 API를 만들어놓았습니다.
            requests.get(SYNOLOGYAPI_URL, data={"msg":text})
            time.sleep(SYNOLOGYAPI_REQUEST_PER_TIME)

    def close(self):
        self.fp.close()

class Youtube:
    apikey = YOUTUBEAPI_KEY
    videoId = ''
    videoInfo = {
        "title": '',        # 제목
        "description": '',  # 설명
        "viewCount": 0,     # 조회수
        "likeCount": 0,     # 좋아요
        "commentCount": 0,  # 댓글수
    }
    comments = [{
        "authorDisplayName": '',    # 닉네임
        "textDisplay": '',          # 댓글
        "publisedAt": '',           # 작성일
        "likeCount": '',            # 좋아요
        "diffTime": '',             # 다음글 작성시간의 차이
        "authorChannelId": '',      # 유저 채널 아이디 (고유값)
        "totalReplyCount": '',      # 답글 수
    }]

    def __init__(self):
        self.printer = Printer()
        self.greeting()
        # 유저한테 동영상 ID를 받아오고
        self.videoId = self.getVideoId()
        # 동영상 정보를 출력하고
        self.videoInfo = self.getVideoInfo()
        # 동영상의 댓글을 저장합니다.
        self.comments = []
        self.comments = self.getVideoComments()
        # 저장된 댓글을 분석해서 몇분만에 다음 댓글이 달렸는지 저장합니다.
        self.addDiffTime(self.comments)
        self.start()

    def start(self):
        while True:
            self.printer.saveOnLocal(f'\n\n{LINE}')
            self.printer.saveOnLocal("유튜브 댓글 조회 프로그램\nby.배돌이의당구생활 채PD")
            self.printer.saveOnLocal(f'{LINE}')
            self.printer.saveOnLocal("  0. 종료")
            self.printer.saveOnLocal("  1. 모든 댓글 조회")
            self.printer.saveOnLocal("  2. 가장 많은 좋아요 댓글 조회")
            self.printer.saveOnLocal("  3. 가장 많은 답글수 댓글 조회")
            self.printer.saveOnLocal("  4. 가장 많이 작성한 사람 조회")
            self.printer.saveOnLocal("  5. 가장 오래 버틴 댓글 조회")
            self.printer.saveOnLocal("  6. 닉네임 조회")
            self.printer.saveOnLocal("  7. 댓글 버티기 이벤트 시작")
            self.printer.saveOnLocal("  8. 다른 영상 조회하기")
            self.printer.saveOnLocal(f'{LINE}')
            command = input("번호를 입력해주세요 : ")
            self.printer.saveOnLocal(f' > 입력된 번호 : {command}')
            if command == "0":
                # 종료
                self.printer.saveOnLocal("프로그램을 종료합니다.")
                self.printer.close()
                break
            elif command == "1":
                # 모든 댓글 조회
                os.system('clear')
                self.printComments(self.comments)
            elif command == "2":
                # 가장 많은 좋아요
                os.system('clear')
                self.printMostLiker()
            elif command == "3":
                # 가장 많은 답글수
                os.system('clear')
                self.printMostReply()
            elif command == "4":
                # 가장 많이 작성한 사람
                os.system('clear')
                self.printMostCommenter()
            elif command == "5":
                # 가장 오래 버틴 댓글
                os.system('clear')
                self.printMostDiffTime()
            elif command == "6":
                # 유저 검색
                os.system('clear')
                username = input("검색할 닉네임을 적어주세요 : ")
                self.printUsersComment(username)
            elif command == "7":
                # 버티기 이벤트 시작
                os.system('clear')
                print(LINE)
                print("🚨   댓글 고정이 없는지 확인해주세요. 되어있다면 고정해제 해주세요.")
                print("🚨   이벤트가 종료될 때 까지 프로그램은 계속 진행됩니다.")
                print(LINE)
                stop = input("  > 몇 분 버티기를 하실건지 입력해주세요 : ")
                os.system('clear')
                self.startDiffTimeEvent(stop)
            elif command == "8":
                # 다른 동영상 조회
                os.system('clear')
                self.__init__()
            else:
                os.system('clear')

    def greeting(self):
        text = f'\n{LINE}\n✋ 프로그램이 시작됩니다. {time.strftime("%y년 %m월 %d일 %H:%M:%S")}\n'
        text += f'>  By.배돌이의당구생활 채PD\n>  영상 댓글 이벤트\n{LINE}'
        self.printer.saveOnLocal(text)

    def youtubeAPIExceed(self):
        # 유튜브 API가 일일 한도가 정해져 있음.
        # 일일한도를 초과하면 다른 키로 변경합니다.
        self.printer.sendToChat(f'🚨 유튜브 API 요청 한도 초과. {YOUTUBEAPI_RESEND_TIMEOUT}초 후 다시 시도합니다.')
        if self.apikey == YOUTUBEAPI_KEY:
            self.apikey = YOUTUBEAPI_SUB_KEY
        elif self.apikey == YOUTUBEAPI_SUB_KEY:
            self.apikey = YOUTUBEAPI_KEY
        time.sleep(YOUTUBEAPI_RESEND_TIMEOUT)

    def youtubeNotFound(self):
        # 유튜브 url을 찾지 못했을 경우
        print(f'🚨 해당 유튜브 영상 URL을 확인할 수 없습니다. 다시 시도해주세요.\n{LINE}')
        self.__init__()

    def getVideoId(self):
        while True:
            videoId = input("1️⃣  영상 URL을 입력해주세요 : ")
            # 유튜브 URL 인지 확인
            try:
                if 'youtu' not in videoId:
                    raise Exception
                if 'watch?v=' in videoId:
                    videoId = videoId.split("=")[1]
                else:
                    videoId = videoId.split("/")[-1]
                return videoId
            except Exception as e:
                print("🚨 해당 영상을 확인할 수 없습니다. 다시 입력해주세요.", e)
                print(LINE)

    def getVideoInfo(self):
        # 유저로부터 받은 동영상url이 정상적인지 확인하고 정상인 경우 동영상 정보를 리턴합니다.
        res = requests.get(f"{YOUTUBEAPI_VIDEOS_URL}?key={self.apikey}&part=snippet,statistics&id={self.videoId}")
        if res.status_code == 200:
            data = res.json()
            items = data["items"]
            # url이 잘못되어도 status 200값과 함께 items는 빈 배열을 리턴합니다.
            if len(items) < 1 :
                self.youtubeNotFound()
            title = items[0]["snippet"]["title"]
            description = items[0]["snippet"]["description"].replace("\n", " ").replace("<br>"," ")
            if len(description) > 25:
                description = f'{description[:25]}...'
            commentCount = int(items[0]["statistics"]["commentCount"])
            likeCount = int(items[0]["statistics"]["likeCount"])
            viewCount = int(items[0]["statistics"]["viewCount"])
            msg = f'✅ 영상이 확인되었습니다.\n'
            msg += f'>  제목: {title}\n'
            msg += f'>  설명: {description}...\n'
            msg += f">  조회수 {format(viewCount, ',')} | 좋아요 {format(likeCount, ',')} | 댓글수 {format(commentCount, ',')}\n"
            msg += LINE
            self.printer.saveOnLocal(msg)
            return {
                "title": title,
                "description": description,
                "viewCount": viewCount,
                "likeCount": likeCount,
                "commentCount": commentCount,
            }
        else:
            # 200을 리턴받지 않으면 유튜브 api 일일한도 초과로 403을 리턴받습니다.
            self.youtubeAPIExceed()

    def getVideoComments(self):
        self.printer.saveOnLocal(f"2️⃣  댓글을 가져옵니다.")
        url = f"{YOUTUBEAPI_COMMENTTHREADS_URL}?key={self.apikey}&maxResults={MAXRESULTS}&part=snippet,replies&videoId={self.videoId}"
        # 동영상의 댓글을 불러옵니다.
        self.getComments(url, 1)
        self.comments = sorted(self.comments, key=itemgetter('publishedAt'))
        self.printer.saveOnLocal(f'✅ 모든 댓글이 확인되었습니다.\n{LINE}')
        return self.comments

    def getComments(self, url, cnt):
        # 1회 요청에 100개까지 받아올 수 있고 100개가 초과하는 댓글의 동영상은 다음 페이지 토큰이 주어집니다.
        # 재귀적으로 다음 페이지 토큰이 없을때 까지 실행됩니다.
        percent = cnt * MAXRESULTS / self.videoInfo["commentCount"] * 100
        if percent > 100:
            percent = 100
        currCnt = cnt * MAXRESULTS
        self.printer.saveOnLocal(f"- ({percent:6.2f}%) {format(currCnt,',')}개 까지 추출중...")
        response = requests.get(url).json()
        try:
            for item in response["items"]:
                self.comments.append(self.validateComment(item))
        except Exception as e:
            # Exception 발생은 api 일일조회를 초과했을 때 나타납니다.
            # api 키를 바꾸고 현재 댓글을 다시 조회합니다.
            self.youtubeAPIExceed()
            self.getComments(url, cnt)

        if "nextPageToken" in response.keys():
            # 다음 댓글이 있으면 재귀로 다시 불러옵니다.
            nextPageToken = str(response["nextPageToken"])
            self.getComments(f"{url}&pageToken={nextPageToken}", cnt + 1)

    def validateComment(self, item):
        # api로 받은 json 에서 필요한 값만 저장합니다.
        publisedAt = item["snippet"]["topLevelComment"]["snippet"]["publishedAt"]
        publisedAt = datetime.strptime(publisedAt, "%Y-%m-%dT%H:%M:%SZ")
        publisedAt = datetime.strftime(utc_to_local(publisedAt), "%y-%m-%d %H:%M:%S")
        comment = {
            "publishedAt" : publisedAt,
            "authorDisplayName" : item["snippet"]["topLevelComment"]["snippet"]["authorDisplayName"],
            # <a href="about:invalid#zCSafez"></a>는 PC에서 유튜브에만 존재하는 이모티콘이 이러게 나옵니다
            "textDisplay" : item["snippet"]["topLevelComment"]["snippet"]["textDisplay"].replace("<br>"," ").replace("&quot;"," ").replace('<a href="about:invalid#zCSafez"></a>','[이모티콘]'),
            "likeCount" : item["snippet"]["topLevelComment"]["snippet"]["likeCount"],
            "authorChannelId" : f'@{item["snippet"]["topLevelComment"]["snippet"]["authorChannelId"]["value"]}',
            "totalReplyCount" : item["snippet"]["totalReplyCount"]
        }
        return comment

    def addDiffTime(self, comments):
        # 배열을 돌면서 현재댓글과 다음댓글의 시간차이를 저장합니다.
        # 마지막 댓글은 현재시간과 시간차이를 저장합니다.
        lastCommentDiffTime = (datetime.now() - datetime.strptime(comments[-1]["publishedAt"], "%y-%m-%d %H:%M:%S")).total_seconds()/60
        comments[-1].update({"diffTime": lastCommentDiffTime})
        for idx in range(0, len(comments)-1):
            curr = datetime.strptime(comments[idx]["publishedAt"], "%y-%m-%d %H:%M:%S")
            next = datetime.strptime(comments[idx+1]["publishedAt"], "%y-%m-%d %H:%M:%S")
            diffTime = (next-curr).total_seconds()/60
            # 삭제되는 댓글이 있을 수 있어서 음수가 나오면 0으로 표시합니다.
            if diffTime < 0:
                diffTime = 0.00
            comments[idx].update({"diffTime": diffTime})

    def printMostCommenter(self):
        # 가장 많이 댓글 작성한 사람을 MOST_CNT번 출력합니다.
        userDict = {}
        for comment in self.comments:
            username = f'{comment["authorDisplayName"]} | {comment["authorChannelId"]}'
            if username in userDict:
                userDict[username] += 1
            else:
                userDict[username] = 1
        mostCommenter = sorted(userDict.items(), key=lambda x:x[1], reverse=True)
        msg = f'\n{LINE}\n👑 가장 댓글 많이 작성한 사람 (총 {format(len(mostCommenter),",")}명)\n'
        if len(mostCommenter) > MOST_CNT:
            for idx in range(0, MOST_CNT):
                msg += f'{mostCommenter[idx][1]}회 | {mostCommenter[idx][0]}\n'
        else:
            for idx in range(0, len(mostCommenter)):
                msg += f'{mostCommenter[idx][1]}회 | {mostCommenter[idx][0]}\n'
        self.printer.saveOnLocal(msg)

    def printMostLiker(self):
        # 가장 많이 좋아요 받은 댓글을 MOST_CNT번 출력합니다.
        mostLiker = []
        for comment in sorter(self.comments, 'likeCount'):
            if comment['likeCount'] > 0:
                mostLiker.append(comment)
            else:
                mostLiker = mostLiker[:MOST_CNT]
        self.printComments(mostLiker, f'{LINE}\n👑 가장 좋아요 많이 받은 댓글')

    def printMostReply(self):
        # 가장 많이 답글 받은 댓글을 MOST_CNT번 출력합니다.
        mostReply = []
        for comment in sorter(self.comments, 'totalReplyCount'):
            if comment['totalReplyCount'] > 0:
                mostReply.append(comment)
            else:
                mostReply = mostReply[:MOST_CNT]
        self.printComments(mostReply, f'{LINE}\n👑 가장 많이 답글 달린 댓글')

    def printMostDiffTime(self):
        # 가장 많이 버틴 댓글을 MOST_CNT번 출력합니다.
        mostDiffTime = sorter(self.comments, 'diffTime')
        self.printComments(mostDiffTime[:MOST_CNT], f'{LINE}\n👑 가장 오래버틴 댓글 (삭제된 댓글은 집계하지 않습니다.)')

    def printUsersComment(self, username):
        # 특정 유저가 작성한 댓글을 모두 출력합니다.
        userComment = []
        for comment in self.comments:
            if comment["authorDisplayName"] == username:
                userComment.append(comment)
        if len(userComment) == 0:
            self.printer.saveOnLocal(f'\n{LINE}\n👑 [{username}]님이 작성한 댓글이 없습니다\n')
        else:
            mostLike = sorter(userComment, 'likeCount')[0]["likeCount"]
            mostReply = sorter(userComment, 'totalReplyCount')[0]["totalReplyCount"]
            mostDiffTime = sorter(userComment, 'diffTime')[0]["diffTime"]
            self.printComments(userComment, f'{LINE}\n👑 [{username}]이 작성한 댓글 (삭제된 댓글은 집계하지 않습니다.)')
            self.printer.saveOnLocal(f'\n총 [{format(len(userComment), ",")}]개의 댓글을 작성하셨습니다.')
            self.printer.saveOnLocal(f'  💚 [{username}]님이 받은 가장 많은 좋아요 갯수는 [{mostLike}]개 입니다')
            self.printer.saveOnLocal(f'  📩 [{username}]님이 받은 가장 많은 답글 갯수는 [{mostReply}]개 입니다')
            self.printer.saveOnLocal(f'  ⏰ [{username}]님이 가장 오래 버틴 댓글은 [{mostDiffTime:.2f}]분 입니다')

    def printComments(self, comments, type=''):
        # 댓글들을 출력합니다.
        self.printer.saveOnLocal(f"\n{type}\n{SMALL_LINE}")
        msg = ''
        for comment in comments:
            msg += self.commentPrettier(comment)
            msg += '\n'
        self.printer.saveOnLocal(msg)

    def commentPrettier(self, comment):
        # 댓글을 출력할때 포맷을 정해줍니다.
        msg = f'{comment["publishedAt"]}\t닉네임: {comment["authorDisplayName"]} | {comment["authorChannelId"]}\n\t\t\t댓글: {comment["textDisplay"]:.33s}\n\t\t\t좋아요: {comment["likeCount"]} | 답글수: {comment["totalReplyCount"]} | 버틴시간: {comment["diffTime"]:.2f}\n{SMALL_LINE}'
        return msg

    def startDiffTimeEvent(self, stop):
        # 앞으로 실시간으로 댓글을 불러옵니다
        # 프로그램이 추출 이후 삭제한 댓글들도 저장됩니다
        msg = f'{LINE}\n✋ *{stop}분 버티기 이벤트 시작*\n> 제목: {self.videoInfo["title"]}\n> 지금부터 {YOUTUBEAPI_REQUEST_PER_TIME}초마다 새로운 댓글을 확인합니다.\n{LINE}\n\n{SMALL_LINE}'
        self.printer.sendToChat(msg)
        url = f"{YOUTUBEAPI_COMMENTTHREADS_URL}?key={self.apikey}&maxResults={LIVE_MAXRESULTS}&part=snippet,replies&videoId={self.videoId}"

        # 이미 추출한 댓글의 마지막 댓글은 버틴시간이 없기 때문에 -1 까지 추출합니다.
        time.sleep(1)
        self.printer.sendToChat(self.commentPrettier(self.comments[-2]))

        while True:
            try:
                # 이미 추출한 댓글의 publishedAt, authorChannelId 를 합쳐서 고유 key를 생성합니다
                prev = self.comments
                keys = []
                for p in prev:
                    key = f'{p["publishedAt"]}{p["authorChannelId"]}'
                    keys.append(key)

                # comments 리스트에 받아놓는데 에러 발생하면 유튜브 api 키 변경
                comments = []
                response = requests.get(url).json()
                try:
                    for item in response["items"]:
                        comments.append(self.validateComment(item))
                except Exception as e:
                    print("ERROR", e)
                    self.youtubeAPIExceed()

                # 새로운 댓글이 나오면 New 배열에 저장합니다.
                found = False
                new = []
                for comment in comments:
                    key = f'{comment["publishedAt"]}{comment["authorChannelId"]}'
                    if key not in keys:
                        new.append(comment)
                        self.comments.append(comment)
                        found = True

                # 새로운 댓글 -1 까지 추출 (끝은 diffTime이 없기 떄문)
                winner = []
                if found:
                    self.addDiffTime(self.comments)
                    for comment in self.comments[len(self.comments)-len(new)-1:-1]:
                        if comment["diffTime"] > int(stop):
                            winner.append(comment)
                        self.printer.sendToChat(self.commentPrettier(comment))

                # 우승자가 있으면 프로그램 종료
                if len(winner) > 0:
                    msg = (f"\n@channel\n👑  {stop}분 버티기 당첨자가 나왔습니다.\n")
                    msg += (f'{SMALL_LINE}\n{self.commentPrettier(winner[0])}\n')
                    msg += ("이벤트를 종료합니다.")
                    self.printer.sendToChat(msg)
                    break

                # 계속해서 리퀘스트를 보내면 api 일일횟수가 금방 달성하기 떄문에 sleep
                # 10초가 적당합니다.
                time.sleep(YOUTUBEAPI_REQUEST_PER_TIME)
            except Exception as e:
                # 에러가 나면 api 일일횟수 초과 api 키 변경
                print("ERROR", e)
                self.youtubeAPIExceed()

os.system('clear')
youtube = Youtube()

synology api

Synology 챗봇으로 텍스트를 전송하기 위한 간단한 API입니다. 장고로 제작했습니다.

synology api
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class Comment(APIView):
    title = "배돌이의당구생활"
    headers = {'Content-Type': 'text/text; charset=utf-8'}
    synology = SynologyChat()
    baedori_url = BAEDORI_BOT_URL

    def get(self, request):
        msg = request.data.get("msg")
        self.synology.send_message(self.baedori_url, self.title, msg)
        return Response(status=200)

class SynologyChat():
    headers = {'Content-Type': 'text/text; charset=utf-8'}

    def send_message(self, url, title, message):
        message = self.clean_text(message)
        payload = 'payload={"text": "' + message + '"}'
        res = requests.post(url, data=payload.encode('utf-8'), headers=self.headers)
        time.sleep(1)
        if res.json()["success"] != True:
            self.send_error(res, title)

    def send_error(self, res, title):
        LogSave(f">{ADMIN_NICKNAME} {title} \nres > {res.json()}")

    def clean_text(self, text):
        text = text.replace("%","퍼센트")
        # text = re.sub('[%+#\^@*\"※~ㆍ!&;』‘\(\)\\\'…》\”\“\’·]', ' ', text)
        text = re.sub('[%+#\^\"※~ㆍ!&;』‘\\\'…》\”\“\’·]', ' ', text)
        text = text.replace("</p>"," ").replace("<p>","")
        text = text.replace("<br>", " ").replace("nbsp", " ")
        text = text.replace("  ", " ").replace("  "," ")
        return text

exe 파일로 추출

utils.py 코드를 pyinstaller로 exe파일로 추출합니다. 가상환경을 pipenv를 사용했습니다.

1
2
pipenv install pyinstaller
pipenv run pyinstaller -F utils.py

위 커맨드를 진행하면 프로젝트 상위 폴더에 ‘dist’ 디렉토리가 생성되고 exe 파일이 생성됩니다.

크기는 약 7MB며 아이콘은 따로 변경했습니다.

결과

1. 모든댓글조회

2. 가장 많은 좋아요 댓글 조회

3. 가장 많은 답글수 댓글 조회

4. 가장 많이 작성한 댓글 조회

5. 가장 많이 작성한 댓글 조회

6. 닉네임 조회

7. 댓글 버티기 이벤트 (일정시간마다 계속 새로운 댓글 확인)

로컬 파일 저장 & 시놀로지 챗봇

프로그램에 출력되는 모든 글은 동시에 로컬에도 저장됩니다.

7번 댓글 버티기 이벤트 진행시 챗봇으로 모든 댓글이 오게 해놓고 당첨자 발생시 채널에 알럿이 오게 했습니다.


마치며

유튜브에서 제공하는 API로 여러가지 이벤트를 기획할 수 있습니다. 다음번에는 인스타그램 API로 비슷한 이벤트를 진행해볼까 계획중입니다. 궁금하신 점이 있으시면 댓글로 남겨주시기 바랍니다.

도움이 되셨으면 배돌이의 당구생활 에도 한번 놀러와주세요 ! 다양한 이벤트 진행중 ~💚