قرار بود این نوشته را سه ماه پیش اینجا منتشر کنم اما نیمه‌کاره رهایش کردم چون روی تازگی مطالب وبلاگ کمی وسواس دارم و دوست دارم حرف جدیدی داشته باشد، نه صرفاً یک tutorial. این پیش نویس فراموشم شده بود تا اینکه شرکت از ما خواست که هر ماه یک مطلب فنی مرتبط روی وبلاگ شرکت منتشر کنیم؛ چون این وسواس روی وبلاگ شرکت برایم مطرح نبود سریعاً شروع به نوشتن کردم. الان متوجه شدم که وبلاگ شرکت از دسترس خارج شده و چون اجازهٔ انتشار مطلب در وبلاگ شخصی هم داشتیم مطلبی که نوشته بودم را با پیش‌نویسی که اینجا داشتم ترکیب و منتشر می‌کنم. پیشاپیش از تغییر لحن‌های (رسمی و عامیانه) احتمالی متن معذرت می‌خوام که نتیجهٔ همین ترکیب شدن متن‌هاست.

یکی از نیازمندی‌های پنل مدیریتی فروشگاه اینترنتی پادوکس امکان نمایش برخط مکان پیک‌های حامل سفارش‌ها است. پنل مدیریت فروشگاه تحت وب طراحی شده است لذا برای پیاده‌سازی این قابلیت راه‌حل خوبی که به ذهن می‌رسد استفاده از websocket است.

سوکت یکی از ساده‌ترین راه‌های ارتباطی دو طرفه بین دو پردازه (در یک سیستم یا بین دو سیستم) است. ارتباط از طریق سوکت را می‌توان به چهار عمل اصلیِ اتصال، قطع اتصال، ارسال و دریافت خلاصه کرد. در پروتکل HTTP همیشه آغازگر ارتباط، مشتری (مرورگر و اپ و…) است و سوکتِ بین مشتری و سرور بلافاصله بعد از اتمام ارتباط بسته می‌شود. اگر در زمانی پس از بسته شدن سوکت لازم شود سرور اطلاعاتی را به کاربر بفرستد راهی برای این کار ندارد؛ بنابراین برای برقراری ارتباطِ همزمان (real time) نیاز به سوکتی داریم که باز بماند، پروتکل وب‌سوکت این قابلیت را در مرورگر و سوار شده بر بستر HTTP فراهم می‌کند.

در این مطلب بررسی کوتاهی بر نحوهٔ پیاده‌سازی و ابزارهای استفاده شده خواهیم داشت.

اجزای سامانه

API پیک‌ها

پیک‌ها برای ارسال مختصات جغرافیایی‌شان و سایر اطلاعات مرتبط با ارسال سفارش‌هایشان از طریق یک Rest API با وب‌سرور لجستیک پادوکس در ارتباط هستند. برای پیاده‌سازی این API از ابزار Django REST framework استفاده کردیم. ارسال موقعیت مکانی از طرف اپلیکیشن پیک‌ها به صورت خودکار انجام می‌شود و فاصلهٔ زمانی دو آپدیت متوالی را سرور به اپ اطلاع می‌دهد. در این جا چون داده‌ای از طرف سرور به اپلیکیشن push نمی‌شود نیازی به استفاده از وب‌سوکت نداشتیم.

پنل مدیریتی – نقشه

برای نمایش نقشه از API گوگل‌مپ استفاده کردیم. امکان نمایش تعداد زیادی نشانگر (Marker) روی نقشه و به روز رسانی محل آن‌ها به همراه امکان تجمیع نشانگرهای نزدیک به هم در بزرگنمایی‌های کم و در نتیجه خلوت‌کردن صفحهٔ نقشه (Marker Clustering) از امکانات خوبی است که گوگل‌مپ در اختیار ما گذاشته است.

کارگزار وب سوکت

برای برقراری ارتباط دو طرفه بین پنل و وب سرور نیازمند یک برنامه کارگزار ساده و سریع هستیم. برای پیاده‌سازی این کارگزار با توجه به این‌که می‌تواند (و در آیندهٔ نزدیک باید) به صورت مستقل از سرویس لجستیک اجرا شود دستمان باز بود که از هر تکنولوژی دلخواهی استفاده کنیم. در نسخهٔ فعلی از زبان پایتون و ابزار aiohttp استفاده کردیم.

کانال ارتباطی – Message Broker

کارگزار وب سوکت و API پیک‌ها در دو اپلیکیشن مجزا اجرا می‌شوند. ذات یکی async و دیگری sync است، برای ارتباط بین این دو بخش سامانه تصمیم گرفتیم که از redis به عنوان یک broker ساده و سریع استفاده کنیم. گزینه‌های دیگر پیش رو استفاده از RabbitMQ یا Kafka بود که بیش از نیازمان بزرگ هستند. گزینهٔ دیگر هم استفاده از یک API مبتنی بر HTTP بود.

استفاده از HTTP مزیت خاصی از نظر سادگی اشکال‌زدایی (debugging) به redis ندارد. تعداد کامپوننت‌های بیشتری دارد و در هنگام استقرار روی سرور دردسر بیشتری به تیم DevOps وارد می‌کند. علاوه‌بر آن با باز کردن پورت جدید هم باعث شلوغی پورت‌های سرور می‌شود هم احتمال سهل انگاری (در تنظیمات firewall و…) و ایجاد مشکل امنیتی را بیشتر می‌کند. در مجموع قابلیت Pub/Sub تعبیه شده در redis یک گزینهٔ بسیار مناسب برای رفع این نیاز در پادوکس است.

برای اطلاعات بیشتر در رابطه با Pub/Sub ردیس می‌توانید به مستندات آن مراجعه کنید؛ اما به صورت خلاصه ردیس می‌تواند تعدادی کانال ارتباطی داشته باشد که هریک یک نام یکتا دارند. پردازه‌های متفاوتی می‌توانند اطلاعاتی را روی یک کانال منتشر کنند و پردازه‌های دیگری که مشترک کانال شده‌اند آن اطلاعات را دریافت می‌کنند. در مورد سرویس ما تنها یک پردازه — API پیک — نقش منتشر کننده را دارد و پردازهٔ کارگزار وب‌سوکت هم نقش مشترکِ (subscriber) کانال را دارد.

این معماری decoupling خوبی بین این دو پردازه ایجاد می‌کند. برای موارد تست یا اشکال‌زدایی می‌توانیم با استفاده از ابزارهای جانبی روی کانال پیام‌هایی را ارسال کنیم یا پیام‌های روی کانال را دریافت و بررسی کنیم. علاوه بر این با گسترش سرویس و نیازمندی‌ها منتشرکننده‌های بیشتری می‌توانند روی این کانال پیام ارسال کنند. پردازهٔ منتشر کننده و پردازهٔ مشترک می‌توانند روی سرورهای مجزایی استقرار یابند و با زبان‌های برنامه نویسی متفاوتی پیاده سازی شوند.

نمونهٔ پیاده‌سازی

API پیک‌ها

در این نمونه کد جهت سادگی مختصاتی که پیک‌ها ارسال می‌کنند ذخیره یا پردازش نمی‌شود و مستقیم به redis پاس داده می‌شوند. لطفاً دقت کنید که نام کانال redis را dlchannel گذاشتیم.

# settings.py
CACHES['broker'] = {
    "BACKEND": "django_redis.cache.RedisCache",
    "LOCATION": "redis://127.0.0.1:6379/1",
    "OPTIONS": {"CLIENT_CLASS": "django_redis.client.DefaultClient"}
}
# api.py
import json
from django_redis import get_redis_connection
from rest_framework import serializers, viewsets, mixins

class DriverLocationSerializer(serializers.Serializer):
    latitude = serializers.DecimalField(max_digits=9, decimal_places=6)
    longitude = serializers.DecimalField(max_digits=9, decimal_places=6)
    user = SimpleUserSerializer(read_only=True)

class DriverLocationEndpoint(mixins.CreateModelMixin, viewsets.GenericViewSet):
    serializer_class = DriverLocationSerializer

    def create(self, request, *args, **kwargs):
        # Validate data
        partial = kwargs.pop('partial', False)
        serializer = self.get_serializer(data=request.data, partial=partial)
        serializer.is_valid(raise_exception=True)
        
        # Publish to redis
        redis = get_redis_connection('broker')
        publish_data = {'user': request.user}
        publish_data.update(serializer.data)
        redis.publish('dlchannel', json.dumps(self.get_serializer(publish_data).data))

        return Response({
            'update_interval': '1/m'
        })

برای تست عملکرد درست این بخش از کامند subscribe ردیس می‌توانیم استفاده کنیم. پس از ارسال اطلاعات به API باید داده‌های مورد نظر در کانال dlchannel ردیس منتشر شوند. یک نمونه انتشار و دریافت موفق را در زیر می‌بینید:

$ redis-cli subscribe dlchannel
Reading messages... (press Ctrl-C to quit)
۱) "subscribe"
۲) "dlchannel"
۳) (integer) 1
۱) "message"
۲) "dlchannel"
۳) "{\"latitude\": 35.0001, \"longitude\": 53.12111, \"user\": {\"id\": 10, \"username\": \"test_driver1\"}}"

پنل مدیریتی

برای پیاده سازی نقشه می‌توانید به مستندات گوگل‌مپ مراجعه کنید. ما در پادوکس از reconnecting websocket استفاده کردیم تا اگر ارتباط پنل با کارگزار قطع شد به صورت خودکار اتصال مجدد برقرار شود. کد کلاینت بسیار ساده و خلاصه است. در زمان بارگذاری نقشه تابع setUpWebsocket فراخوانی می‌شود.

function setUpWebsocket() {
    wsc = new ReconnectingWebSocket('ws://127.0.0.1:8082/ws');
    wsc.onopen = function () {
    };
    wsc.onerror = function (error) {
        console.log('WebSocket Error ' + error);
    };
    wsc.onmessage = function (e) {
        updateOnePoint(jQuery.parseJSON(e.data));
    };
}

در تابع updateOnePoint نشانگر (Marker) روی نقشه با توجه به jsonی که از طرف وب‌سوکت دریافت شده جا به جا می‌شود.

کارگزار وب‌سوکت

در اینجا از یک dictionary استفاده می‌کنیم تا بتوانیم متغیر وب‌سوکت هر کاربر را پیدا کنیم. این دیکشنری در self['sockets']‎ ذخیره می‌شود. وب سوکت را در آدرس /ws‎ قرار می‌دهیم. لذا URL کامل وب سوکت همانطور که در کد جاوااسکریپت بالا دیدید ws://127.0.0.1:8082/ws‎ خواهد شد.

import asyncio, aiohttp
import logging


logger = logging.getLogger(__name__)


class App(aiohttp.web.Application):
    def __init__(self, *args, **options):
        self['sockets'] = dict()
        self.router.add_get('/ws', self.websocket_handler)

    async def websocket_handler(self, request):
        ws = web.WebSocketResponse(protocols=('echo-protocol',))
        logger.info('New websocket connection')
        await ws.prepare(request)
        self['sockets'][request.user] = ws
        logger.info('Connection established user=' + str(request.user))

app = App()
aiohttp.web.run_app(app, host='127.0.0.1', port=8082)

درخواست ایجاد ارتباط وب سوکت در دو مرحله انجام می‌شود. در مرحلهٔ اول (handshake) ابتدا یک درخواست HTTP عادی به کارگزار فرستاده می‌شود که شامل سرایند Connection: Upgrade‎ است. اگر کارگزار امکان برقراری ارتباط وب‌سوکتی را داشته باشد و درخواست از طرف کاربر مجاز باشد مرحلهٔ دوم که ساخت اتصال اصلی سوکت و تبادل داده است انجام می‌شود. در این مثال هیچ authenticationی انجام نمی‌شود. برای اعتبار سنجی کاربر در مرحلهٔ handshake باید درخواست را بررسی کنیم و اگر کاربر مجاز به برقراری ارتباط نبود با پاسخ‌های مناسب (مانند۴۰۱ یا ۴۰۳) جلوی ادامهٔ عملیات را بگیریم. فراخوانی تابع ws.prepare به معنی پایان handshake و آغاز ارتباط وب سوکت است، بنابراین هرگونه authentication ای باید قبل از فراخوانی این تابع انجام شود. تا زمانی که این متد فراخوانی نشده باشد ارتباط هنوز یک ارتباط HTTP ساده است و تمام response codeها و قوانین حاکم بر پروتکل HTTP در آن قابل استفاده است.

در این مثال از یک کد ساده و عملا بی مصرف در تابع authenticate استفاده می‌کنیم. در کاربردهای واقعی باید منطق مورد نظرتان من جمله session یا token را در آن پیاده سازی کنید.

def authenticate(self, request, ws):
    # Your authentication logic
    class User:
        def __init__(self, _id, username):
            self.id = _id
            self.username = username
    request.user = User(1, 'panel_admin')
    return True

# in websocker_handler method, before calling 'ws.prepare'

if not self.authenticate(request, ws):
    raise aiohttp.web.HTTPUnauthorized()

پس از برقراری ارتباط موفق به کمک یک حلقهٔ for می‌توانیم پیام‌هایی که از طرف مرورگر به سرور ارسال می‌شوند را دریافت کنیم. در پنل پادوکس پیامی از طرف مرورگر به سرور ارسال نمی‌شود بنابراین یک حلقهٔ ساده درست می‌کنیم که در انتها وقتی کاربر ارتباط وب‌سوکت را قطع کند آن سوکت را از دیکشنری sockets حذف کند.

async for msg in ws:
    if msg.type == aiohttp.WSMsgType.ERROR:
        logger.error('Connection error. user=%s exception=%s' %
                        (str(request.user), ws.exception()))
    elif msg.type == aiohttp.WSMsgType.CLOSE:
        logger.info('Request connection closure. user=' + str(request.user))
        await ws.close()

logger.info('websocket connection closed. user=' + str(request.user))

del self['sockets'][request.user]

تا اینجا کدی داریم که ارتباط وب‌سوکت را با مرورگر برقرار می‌کند، پیام‌های دریافت شده را پردازش می‌کند و در آخر سوکت را می‌بندد.
حالا باید کدی بنویسیم که در یک looper جدا به پیام‌هایی که از طرف redis دریافت می‌شوند را به تمام کاربران وب‌سوکت ارسال کند. کلاس PubSub کلاس کمکی‌ای است که مشترک کانال dlchannel می‌شود و پیام‌های دریافت شده را به وب‌سوکت‌ها ارسال می‌کند.

class PubSub(object):
    def __init__(self):
        self.loop = asyncio.get_event_loop()
        self.channel = None
        self.redis = None
        self.mpsc = Receiver()

    async def set_up(self, app: web.Application):
        self.redis = await aioredis.create_redis(('127.0.0.1', 6379))
        self.channel = await self.subscribe_to_channel('dlchannel')
        self.loop.create_task(self.read_messages(app))

    async def read_messages(self, app: web.Application):
        while await self.mpsc.wait_message():
            try:
                sender, message = await self.mpsc.get(encoding='utf-8')
                logger.info(f"Got message {message} from {sender.name}")
                self.loop.create_task(self.send_message_to_observers(app, message))
            except Exception as e:
                logger.error('Error getting message from redis', e)

    async def subscribe_to_channel(self, channel_name: str):
        return self.redis.subscribe(self.mpsc.channel(channel_name))

    async def send_message_to_observers(self, app: web.Application, message: str):
        promises = []
        for ws in app['sockets'].values():
            promises.append(ws.send_str(message))

        resolved_promises = await asyncio.gather(*promises, return_exceptions=True)
        for r in resolved_promises:
            if isinstance(r, Exception):
                logger.error(f'Failed to send message to one of subscribers {r}')
            else:
                logger.info('Successfully published the update')

کد نوشته شده در این کلاس سرراست و ساده است و نیازی به توضیح بیشتری ندارد.

زمانی که کارگزار آماده به کار می‌شود باید یک نسخه از این کلاس ساخته شود و متد set_up آن فراخوانی شود. بنابراین این دو خط را به انتهای متد __init__ کلاس App اضافه می‌کنیم:

self['subscriber'] = PubSub()
self.on_startup.append(self['subscriber'].set_up)

بهبودهای آینده

  • با قرار گرفتن کارگزار وب‌سوکت پشت یک API Gateway بخش authentication به صورت کامل می‌تواند حذف و واگذار به Gateway شود. در این حالت کلیدهای دیکشنری sockets بجای آبجکت User می‌تواند یک رشتهٔ متنی حاوی username باشد که از headerها به راحتی دریافت می‌شود.
  • استفاده از کتابخانهٔ django-websocket-redis و قابلیت Channels که به جننگو اضافه شده ممکن است در بهبود کد کمک کننده باشد.
  • بجای broadcast کردن پیام‌های دریافت شده از کانال ردیس، پیام فقط به کاربرانی ارسال شوند که نیاز و اجازهٔ دسترسی به آن اطلاعات دارند.
  • بازنویسی کارگزار با زبان برنامه‌نویسی و/یا فریم‌ورکی سریعتر و سبک‌تر.
  • بهبود کارگزار و نحوهٔ انتشار پیام‌ها در redis جهت Load Balance کردن ترافیک وب‌سوکت‌ها.
  • باز کردن دسترسی کاربران عادی سیستم جهت مشاهدهٔ سفارششان در نقشه.
  • استفاده از نقشهٔ سیدار