본문 바로가기
Python, API

[Python][Bybit V5 API] Private/Public Websocket 접속

by 오늘밤날다 2024. 5. 27.

 

import asyncio
import websockets
import json
import hmac
from datetime import *
import time


class Main():

    # Private Websocket에 접속하기 위한 인증준비
    def send_auth(self):
        api_key = 'USER_API_KEY'
        api_secret = 'USER_API_SECRET'
        expires = int((time.time() + 10) * 1000)
        _val = f'GET/realtime{expires}'
        signature = str(hmac.new(
            bytes(api_secret, 'utf-8'),
            bytes(_val, 'utf-8'), digestmod='sha256'
        ).hexdigest())
        ret = json.dumps({"op": "auth", "args": [api_key, expires, signature]})
        return ret

    # Private Websocket
    async def bybit_private_ws(self):
        async for ws_private in websockets.connect("wss://stream.bybit.com/v5/private?max_active_time=10m"): # "?max_active_time=10m" 추가
            await ws_private.send(self.send_auth)  # private websocket 접속 인증
            await ws_private.send('{"op":"subscribe","args":["position","execution","order","wallet"]}')

            try:
                while True:
                    data_rcv_strjson = await ws_private.recv()
                    rawdata = json.loads(data_rcv_strjson)
                    print(rawdata)

            # ConnectionClosed -> async for... 로 돌아가서 websocket에 접속
            except websockets.ConnectionClosed:
                print('Private websockets.ConnectionClosed')
                continue

    # Public Websocket
    async def bybit_public_ws(self):
        async for ws_public in websockets.connect("wss://stream.bybit.com/v5/public/linear"):
            await ws_public.send('{"op": "subscribe", "args": ["publicTrade.BTCUSDT"]}')

            send_ping_time = datetime.now()
            send_ping_process = False

            try:
                while True:
                    data_rcv_strjson = await ws_public.recv()
                    rawdata = json.loads(data_rcv_strjson)
                    print(rawdata)

                    if 'ret_msg' in rawdata:
                        # ping 전달 이후 ret_msg가 pong이 오면 send_ping_time을 현재 시각으로 업데이트 후 대기
                        if rawdata['ret_msg'] == 'pong':
                            send_ping_process = False
                            send_ping_time = datetime.now()

                    # send_ping_time이 20초가 경과한 경우 ping 전달
                    if send_ping_time + timedelta(seconds=20) < datetime.now() and send_ping_process is False:
                        send_ping_process = True
                        await ws_public.send('{"req_id": "public_ping", "op": "ping"}')

            # ConnectionClosed -> async for... 로 돌아가서 websocket에 접속
            except websockets.ConnectionClosed:
                print('Public websockets.ConnectionClosed')
                continue
                
    # asyncio loop 시작
    async def start_websocket(self):
        await asyncio.gather(self.bybit_public_ws(), self.bybit_private_ws())


if __name__ == "__main__":
    main = Main()
    asyncio.run(main.start_websocket())

 

 

예전에 쓰던 코드가 작동하지 않아서 Bybit 홈페이지에 가보니 V5가 나온 것 같아서 이 버전으로 만들어 봤다. Private과 Public 둘 다 연결하는 것까지는 성공했는데 'keepalive ping timeout; no close frame received' 오류 때문에 고생을 좀 했다.

 

대충 알아보니 웹소켓 연결은 내가 아무것도 하지 않고 일정 시간 동안 가만히 데이터를 받기만 하면 접속이 끊기게 되어있는 모양이다. 잘은 모르겠지만 이건 API마다 그 시간과 주기가 차이가 있는 것 같고 아무것도 하지 않아도 접속이 계속 유지되는 경우도 꽤나 있었지만 이상하게 Bybit의 V5 Websocket은 10분이 지나고 나니 칼같이 연결이 끊어져버렸다.

 

 

아무튼 해결책을 찾기 위해서 API Documentation을 찾아봤고 접속제한 시간을 늘리거나 heartbeat packet을 주기적으로 주고받으라는 설명이 있었다.

 

https://bybit-exchange.github.io/docs/v5/ws/connect

 

Connect | Bybit API Documentation

WebSocket public stream:

bybit-exchange.github.io

 

 

 

Public Websocket은 어찌어찌하여 20초마다 ping을 보내서 접속을 살려놓는 데 성공했는데 Private Websocket의 Loop 내에서 어떻게 주기적으로 ping을 보낼지 한참 고민을 했다. 분명히 무언가 방법이 있을 것 같은데 망할 asyncio loop 안에서 이걸 내 실력으로 구현해 내기가 쉽지 않았다. 그렇게 대략 반나절 가량 구글링을 해봤지만...

 

 

 

결론은 Private Websocket의 keep alive는 포기했다. 대신 접속이 끊겼을 경우 프로그램을 종료하지 않고 바로 다시 websocket 연결을 살리는 방법을 쓰기로 했다. websocket을 연결하는 부분을 async for와 try, except으로 감싸놓으면 해결된다.

 

https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html

 

Client (asyncio)

Opening a connection: Using a connection:

websockets.readthedocs.io

 

async for websocket in websockets.connect(...):
    try:
        ...
    except websockets.ConnectionClosed:
        continue

 

 

 

Public의 경우 계속해서 시세가 들어오지만 Private은 주문 등 행위가 발생하지 않으면 크게 문제가 없기 때문에 최대치인 10분으로 설정해 놓은 max_active_time 이 지나고 순간적으로 발생하는 공백은 크게 문제가 없을 것 같다. 이 짧은 1초 미만의 시간 사이에 주문이 발생하는 것을 피하기 위해서는 websocket 접속이 진행 중이라는 변수를 설정하고 이 변수가 정상인 경우에만 주문을 진행하도록 해두었다.