Examples

aiohttp

from aiohttp import web
from jsonrpcserver import method, Result, Success, async_dispatch


@method
async def ping() -> Result:
    return Success("pong")


async def handle(request):
    return web.Response(
        text=await async_dispatch(await request.text()), content_type="application/json"
    )


app = web.Application()
app.router.add_post("/", handle)

if __name__ == "__main__":
    web.run_app(app, port=5000)

See blog post.

Django

Create a views.py:

from django.http import HttpResponse
from django.views.decorators.csrf import csrf_exempt
from jsonrpcserver import method, Result, Success, dispatch


@method
def ping() -> Result:
    return Success("pong")


@csrf_exempt
def jsonrpc(request):
    return HttpResponse(
        dispatch(request.body.decode()), content_type="application/json"
    )

See blog post.

Flask

from flask import Flask, Response, request
from jsonrpcserver import method, Result, Success, dispatch

app = Flask(__name__)


@method
def ping() -> Result:
    return Success("pong")


@app.route("/", methods=["POST"])
def index():
    return Response(
        dispatch(request.get_data().decode()), content_type="application/json"
    )


if __name__ == "__main__":
    app.run()

See blog post.

http.server

Using Python’s built-in http.server module.

from http.server import BaseHTTPRequestHandler, HTTPServer

from jsonrpcserver import method, Result, Success, dispatch


@method
def ping() -> Result:
    return Success("pong")


class TestHttpServer(BaseHTTPRequestHandler):
    def do_POST(self):
        # Process request
        request = self.rfile.read(int(self.headers["Content-Length"])).decode()
        response = dispatch(request)
        # Return response
        self.send_response(200)
        self.send_header("Content-type", "application/json")
        self.end_headers()
        self.wfile.write(response.encode())


if __name__ == "__main__":
    HTTPServer(("localhost", 5000), TestHttpServer).serve_forever()

See blog post.

Plain jsonrpcserver

Using jsonrpcserver’s built-in serve method.

from jsonrpcserver import method, Result, Success, serve


@method
def ping() -> Result:
    return Success("pong")


if __name__ == "__main__":
    serve()

Socket.IO

from flask import Flask
from flask_socketio import SocketIO, send
from jsonrpcserver import method, Result, Success, dispatch

app = Flask(__name__)
socketio = SocketIO(app)


@method
def ping() -> Result:
    return Success("pong")


@socketio.on("message")
def handle_message(request):
    if response := dispatch(request):
        send(response, json=True)


if __name__ == "__main__":
    socketio.run(app, port=5000)

See blog post.

Tornado

from jsonrpcserver import method, Result, Success, async_dispatch
from tornado import ioloop, web


@method
async def ping() -> Result:
    return Success("pong")


class MainHandler(web.RequestHandler):
    async def post(self) -> None:
        request = self.request.body.decode()
        if response := await async_dispatch(request):
            self.write(response)


app = web.Application([(r"/", MainHandler)])

if __name__ == "__main__":
    app.listen(5000)
    ioloop.IOLoop.current().start()

See blog post.

Websockets

import asyncio

from jsonrpcserver import method, Success, Result, async_dispatch
import websockets


@method
async def ping() -> Result:
    return Success("pong")


async def main(websocket, path):
    if response := await async_dispatch(await websocket.recv()):
        await websocket.send(response)


start_server = websockets.serve(main, "localhost", 5000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

See blog post.

Werkzeug

from jsonrpcserver import method, Result, Success, dispatch
from werkzeug.serving import run_simple
from werkzeug.wrappers import Request, Response


@method
def ping() -> Result:
    return Success("pong")


@Request.application
def application(request):
    return Response(dispatch(request.data.decode()), 200, mimetype="application/json")


if __name__ == "__main__":
    run_simple("localhost", 5000, application)

See blog post.

ZeroMQ

from jsonrpcserver import method, Result, Success, dispatch
import zmq

socket = zmq.Context().socket(zmq.REP)


@method
def ping() -> Result:
    return Success("pong")


if __name__ == "__main__":
    socket.bind("tcp://*:5000")
    while True:
        request = socket.recv().decode()
        socket.send_string(dispatch(request))

See blog post.

ZeroMQ (asynchronous)

from jsonrpcserver import method, Result, Success, async_dispatch
import aiozmq
import asyncio
import zmq


@method
async def ping() -> Result:
    return Success("pong")


async def main():
    rep = await aiozmq.create_zmq_stream(zmq.REP, bind="tcp://*:5000")
    while True:
        request = (await rep.read())[0].decode()
        if response := (await async_dispatch(request)).encode():
            rep.write((response,))


if __name__ == "__main__":
    asyncio.set_event_loop_policy(aiozmq.ZmqEventLoopPolicy())
    asyncio.get_event_loop().run_until_complete(main())

See blog post.