跳转至

基于Django的websocket实现

get_channel_layerget_channel_layer官方文档

https://channels.readthedocs.io/en/latest/index.html

安装

pip install Django==1.11.11 channels==2.1.4 channels-redis==2.3.1 asgiref==2.1.6 asgi-redis==1.4.3

创建django项目

django-admin.py startproject example_channels
cd example_channels
python manage.py startapp example

注册 APP

    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'channels',  # 注册channels
        'example',  # 添加新创建的app
    ]

配置路由(1)

创建默认路由 (主WS路由)

Channels路由配置类似于Django URLconf, 因为当通道服务器接收到HTTP请求时, 它告诉通道运行什么代码

将从一个空路由配置开始,创建一个文件 example_channels/routing.py ,并包含以下代码

# 【channels】(第2步)设置默认路由在项目创建routing.py文件

from channels.routing import ProtocolTypeRouter

application = ProtocolTypeRouter({
    # Empty for now (http->django views is added by default)
})

设置执行路由对象 (指定routing)

修改 settings 配置 将ASGI_APPLICATION设置为指向路由对象作为根应用程序

ASGI_APPLICATION = "example_channels.routing.application"

channel layer

  • channel允许用户在应用程序的不同实例之间进行通讯,这是实现分布式实时应用的一部分如果你不想所有的message和event都经由数据库的话,此外,它还可以和工作进程结合使用来创建基本的任务队列或者卸载任务,但是channel本身是不附带channel layer的,因为每一个channel layer都依赖于不同的网络数据传输方式

  • 可以是一个FIFO队列,通常使用Redis 信道层是一种通信系统,它允许多个消费者实例彼此交谈,以及与Django的其他部分交谈,channel官方推荐的是配置channel_redis, 这是一个使用Redis作为传输的Django维护层

  • 通道是一个可以将邮件发送到的邮箱,每个频道都有一个名称,任何拥有频道名称的人都可以向频道发送消息

  • 一组是一组相关的通道,一个组有一个名称,任何具有组名称的人都可以按名称向组添加/删除频道,并向组中的所有频道发送消息,无法枚举特定组中的通道

  • 每个使用者实例都有一个自动生成的唯一通道名,因此可以通过通道层进行通信

  • 对于channel layer的方法(包括send(),group_send(),group_add()等)都属于异步方法,这意味着在调用的时候都需要使用await,而如果想要在同步代码中使用它们,就需要使用装饰器asgiref.sync.async_to_sync

from asgiref.sync import async_to_sync

async_to_sync(channel_layer.send)("channel_name", {...})
  • 在我们的聊天应用程序中,我们希望同一个房间中的多个聊天消费者实例相互通信,为此,我们将让每个聊天消费者将其频道添加到一个组,该组的名称基于房间名称,这将允许聊天用户向同一房间内的所有其他聊天用户发送消息

  • 不同于channel1.x,channel2.0的channel layer 仅适用于高级应用程序之间的通讯,当用户发送message的时候,会先传送到consumer中,然后对应的consumer会监听对应的group或者channel,而不是直接将message传输到socket中这意味着用户可以传输高level的event到channel layer,然后让consumer进行处理这些event然后选择低level的网络连接进行执行

  • 比如,Andrew Godwin的存储库的的实例channels_examples通过channel layer 发送这样一个事件

    await self.channel_layer.group_send(
        room.group_name,
        {
            "type": "chat.message",
            "room_id": room_id,
            "username": self.scope["user"].username,
            "message": message,
        }
    )
    
  • 然后consumer定义一个处理函数接收这个事件并转化成websocket框架

async def chat_message(self, event):
    """
    Called when someone has messaged our chat.
    """
    # Send a message down to the client
    await self.send_json(
        {
            "msg_type": settings.MSG_TYPE_MESSAGE,
            "room": event["room_id"],
            "username": event["username"],
            "message": event["message"],
        },
    )
  • 所有基于SyncConsumer或AsyncConsumer的consumer都提供了两个属性: self.channel_layer和self.channel_name属性, 分别为指向channel layer本身和consumer的channel layer的名字
  • 发送到channel的message或者添加到channel的group的message,都会由consumer接收,就像来自连接的客户端的事件一样,根据相应的命名方式选择对应的consumer,命名方法是将 . 替换成 _ ,在上述的例子中,type的value是chat.message,对应的处理方法就是chat_message

配置

channel2.0不同于1.x,channel layer属于完全可选部分,这意味着如果不想使用的话可以设置成空字典{}或者CHANNEL_LAYER不配置即可

  # settings.py
  CHANNEL_LAYERS = {
   "default": {
       "BACKEND": "channels_redis.core.RedisChannelLayer",
       "CONFIG": {
           "hosts": ["redis://:password@127.0.0.1:6379/0"],
       },
   },
  }

验证安装

为了验证 channel layer可以与Redis通信 我们使用 python manage shell 打开django的shell

import channels.layers
channel_layer = channels.layers.get_channel_layer()
from asgiref.sync import async_to_sync
async_to_sync(channel_layer.send)('test_channel', {'type': 'hello'})
async_to_sync(channel_layer.receive)('test_channel')
# {'type': 'hello'}

channels_redis 的其他参考配置

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("localhost", 6379)],
        },
    },
}


CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": ["redis://127.0.0.1:6379/8"],
        },
    },
}


CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}


CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": ["redis://:password@127.0.0.1:6379/0"],
            "symmetric_encryption_keys": [SECRET_KEY],
        },
    },
}

创建consumers.py(类似django视图)

我们在创建的应用目录下创建相关处理文件 example_channels/example/consumers.py

同步消费者很方便,因为他们可以调用常规的同步I / O函数,例如那些在不编写特殊代码的情况下访问Django模型的函数, 但是异步使用者可以提供更高级别的性能因为他们在处理请求时不需要创建其他线程

Consumer仅使用异步本机库(通道和通道层), 特别是它不访问同步Django模型。 因此,它可以被重写为异步而不会出现复杂情况

使用异步方式

  • 使用异步时继承自AsyncWebsocketConsumer而不是WebsocketConsumer
  • 所有方法都是async def而不是def
  • await用于调用执行I / O的异步函数
  • 在通道层上调用方法时不再需要async_to_sync
# 【channels】创建应用的消费者
from channels.generic.websocket import WebsocketConsumer, AsyncWebsocketConsumer
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
import json


class AsyncConsumer(AsyncWebsocketConsumer):
    async def connect(self):  # 连接时触发
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = 'notice_%s' % self.room_name  # 直接从用户指定的房间名称构造Channels组名称,不进行任何引用或转义。

        # 将新的连接加入到群组
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.accept()

    async def disconnect(self, close_code):  # 断开时触发
        # 将关闭的连接从群组中移除
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    # Receive message from WebSocket
    async def receive(self, text_data=None, bytes_data=None):  # 接收消息时触发
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        # 信息群发
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'system_message',
                'message': message
            }
        )

    # Receive message from room group
    async def system_message(self, event):
        print(event)
        message = event['message']

        # Send message to WebSocket单发消息
        await self.send(text_data=json.dumps({
            'message': message
        }))

使用同步方式

这里仅仅做示例,不使用

# 同步方式,仅作示例,不使用
class SyncConsumer(WebsocketConsumer):
    def connect(self):
        # 从打开到使用者的WebSocket连接的chat/routing.py中的URL路由中获取'room_name'参数。
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        print('WebSocket建立连接:', self.room_name)
        # 直接从用户指定的房间名称构造通道组名称
        self.room_group_name = 'msg_%s' % self.room_name

        # 加入房间
        async_to_sync(self.channel_layer.group_add)(
            self.room_group_name,
            self.channel_name
        )  # async_to_sync(…)包装器是必需的,因为ChatConsumer是同步WebsocketConsumer,但它调用的是异步通道层方法。(所有通道层方法都是异步的。)

        # 接受WebSocket连接。
        self.accept()
        simple_username = self.scope["session"]["session_simple_nick_name"]  # 获取session中的值

        async_to_sync(self.channel_layer.group_send)(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': '@{} 已加入房间'.format(simple_username)
            }
        )

    def disconnect(self, close_code):
        print('WebSocket关闭连接')
        # 离开房间
        async_to_sync(self.channel_layer.group_discard)(
            self.room_group_name,
            self.channel_name
        )

    # 从WebSocket中接收消息
    def receive(self, text_data=None, bytes_data=None):
        print('WebSocket接收消息:', text_data)
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        # 发送消息到房间
        async_to_sync(self.channel_layer.group_send)(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message
            }
        )

    # 从房间中接收消息
    def system_message(self, event):
        message = event['message']

        # 发送消息到WebSocket
        self.send(text_data=json.dumps({
            'message': message
        }))

函数说明

def connect(self): 连接时触发
def disconnect(self, close_code): 断开时触发
def receive(self, text_data): 接收消息时触发
self.channel_layer.group_add( self.room_group_name, self.channel_name ) 将新的连接加入到群组
self.channel_layer.group_discard( self.room_group_name, self.channel_name ) 将关闭的连接从群组中移除
self.channel_layer.group_send)( self.room_group_name, { 'type': 'chat_message', 'message': message } 信息群发
self.send(text_data=json.dumps({ 'message': message })) 单发消息
self.scope [ 'url_route'] [ 'kwargs'] [ 'ROOM_NAME'] 每个用户都有一个范围其中包含有关其连接的信息特别是包括URL路由中的任何位置或关键字参数以及当前经过身份验证的用户如果有
self.room_group_name ='chat_%s'self.room_name 直接从用户指定的房间名称构造Channels组名称不进行任何引用或转义
组名只能包含字母数字连字符和句点因此此示例代码将在具有其他字符的房间名称上失败
async_to_sync.....) 包装器是必需的因为ChatConsumer是同步WebsocketConsumer但它调用异步通道层方法。(所有通道层方法都是异步的。)
self.accept() 接受WebSocket连接
如果不在connect()方法中调用accept(),则拒绝并关闭连接例如您可能希望拒绝连接因为请求的用户无权执行请求的操作
如果您选择接受连接建议将accept()作为connect()中的最后一个操作

配置路由(2)

应用下创建routing.py

# 【channels】为应用程序创建一个路由配置,该应用程序具有到消费者的路由
from django.conf.urls import url
from example import consumers

websocket_urlpatterns = [
    # url(r'^ws/msg/(?P<room_name>[^/]+)/$', consumers.SyncConsumer),  # 同步方法不测试了
    url(r'^ws/msg/(?P<room_name>[^/]+)/$', consumers.AsyncConsumer),
]

修改项目主路由

from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from channels.sessions import SessionMiddlewareStack
import example.routing

application = ProtocolTypeRouter({
    # (http->django views is added by default)
    # 【channels】添加路由配置指向应用的路由模块
    'websocket': SessionMiddlewareStack(  # 使用Session中间件,可以请求中获取session的值当然,也可以去掉或者使用其他中间件
        URLRouter(
            example.routing.websocket_urlpatterns
        )
    ),
})

外部发送消息

无论是消息的推送或者消息的接受,都是经过channel layer进行传输,且经过 async_to_sync 包裹

在实例化之后就可以进行调用了,但是需要注意的是,get_channel_layer()需要异步使用,调用的时候需要添加await或者使用async_to_sync来调用

如果在consumer的范围外发送message到channel layer的时候,self.channel_layer是不可用的,此时需要使用get_channel_layer方法

for chat_name in chats:
    await channel_layer.group_send(
        chat_name,
        {"type": "system_message", "message": announcement_text},
    )```

使用async_to_sync来包裹调用

def send_group_msg(room_name, message):
    # 从Channels的外部发送消息给Channel
    """
    :param room_name:
    :param message:
    :return:
    """
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        'notice_{}'.format(room_name),  # 构造Channels组名称
        {
            "type": "system_message",
            "message": message,
        }
    )
def send_channel_msg(channel_name,msg):

    channel_layer = get_channel_layer()
    async_to_sync( channel_layer.send)(channel_name, {
    "type": "system_message",
    "message":msg ,
})
    from example import consumers
    consumers.send_group_msg('ITNest', {'content': '1'})
    consumers.send_group_msg('ITNest', {'content': '1'})
    consumers.send_channel_msg('specific.QOXzRUBh!KDmyeFWEYdnR',{'content': '1'}) 

single channel

  • 每个应用程序实例,比如long-running的HTTP请求或者websocket连接都会产生一个相应的consumer实例,此时如果还启用了channel layer的话,consumer还会再生成一个对应的channel layer,并开始监听它的event

  • 这就意味着可以从进程的外部(从其他的consumer或者管理命令)来发送consumer event,而对于对应的consumer,也会做出响应,将这些来自consumer的event当做来自客户端的event一样进行处理

  • channel_name可以在consumer中调用self.channel_name属性获取

class ChatConsumer(WebsocketConsumer):

    def connect(self):
        # Make a database row with our channel name
        Clients.objects.create(channel_name=self.channel_name)

    def disconnect(self, close_code):
        # Note that in some rare cases (power loss, etc) disconnect may fail
        # to run; this naive example would leave zombie channel names around.
        Clients.objects.filter(channel_name=self.channel_name).delete()

    def chat_message(self, event):
        # Handles the "chat.message" event when it's sent to us.
        self.send(text_data=event["text"])
  • 但是,由于混合了来自channel layer和连接协议的event,需要确保类型名称不会发生冲突,建议像这里的代码一样添加前缀名称(chat.)来进行区分,避免冲突

  • 而要发送某个channel,只需要找到对应的channel_name,调用channel_layer.send就可以了

from channels.layers import get_channel_layer

channel_layer = get_channel_layer()
await channel_layer.send("channel_name", {
    "type": "chat.message",
    "text": "Hello there!",
})

single group

  • 对于大多数情况来说,发送到单人的channel并没有用,更多的情况下希望可以以广播的方式将message一次性发送给多个channel或者consumer,这不仅适用于想在向房间内的每个人发送消息,还适用于发送给连接了多个浏览器/标签/设备的用户

  • 对于group来说 允许从命名组中添加和删除channel,也支持发送group,提供group到期清理应用程序无法处理的连接

  • 你可以在connect方法中添加组,在disconnect中删除组,代码如下

# This example uses WebSocket consumer, which is synchronous, and so
# needs the async channel layer functions to be converted.
from asgiref.sync import async_to_sync

class ChatConsumer(WebsocketConsumer):

    def connect(self):
        async_to_sync(self.channel_layer.group_add)("chat", self.channel_name)

    def disconnect(self, close_code):
        async_to_sync(self.channel_layer.group_discard)("chat", self.channel_name)
  • 如果想要发送message到group中,可以使用group_send方法
class ChatConsumer(WebsocketConsumer):

    ...

    def receive(self, text_data):
        async_to_sync(self.channel_layer.group_send)(
            "chat",
            {
                "type": "chat.message",
                "text": text_data,
            },
        )

    def chat_message(self, event):
        self.send(text_data=event["text"])

channels将同步的MySQL转换为异步的:

# ORM语句同步变异步,方式一
from channels.db import database_sync_to_async
user = await database_sync_to_async(User.objects.get(username=username))

# ORM语句同步变异步,方式二
@database_sync_to_async
def get_username(username):
    return User.objects.get(username=username)

项目目录

65747-2bmm1rakg2l.png

在线测试

这里推荐一个网站 http://www.websocket-test.com/,本次搭建访问地址是 ws://ip:port/ws/msg/ITNest/ 测试使用的数据为{"message":123}

鸣谢

https://www.jianshu.com/p/0f75e2623418

https://www.cnblogs.com/wdliu/p/10028236.html

https://www.jianshu.com/p/1573e775e39b