Examples¶
Contents
aiohttp¶
from aiohttp import web
from jsonrpcserver import method, Result, Success, async_dispatch
@method
async def ping() -> Result:
return Success("pong")
async def handle(request):
return web.Response(
text=await async_dispatch(await request.text()), content_type="application/json"
)
app = web.Application()
app.router.add_post("/", handle)
if __name__ == "__main__":
web.run_app(app, port=5000)
See blog post.
Django¶
Create a views.py
:
from django.http import HttpResponse
from django.views.decorators.csrf import csrf_exempt
from jsonrpcserver import method, Result, Success, dispatch
@method
def ping() -> Result:
return Success("pong")
@csrf_exempt
def jsonrpc(request):
return HttpResponse(
dispatch(request.body.decode()), content_type="application/json"
)
See blog post.
Flask¶
from flask import Flask, Response, request
from jsonrpcserver import method, Result, Success, dispatch
app = Flask(__name__)
@method
def ping() -> Result:
return Success("pong")
@app.route("/", methods=["POST"])
def index():
return Response(
dispatch(request.get_data().decode()), content_type="application/json"
)
if __name__ == "__main__":
app.run()
See blog post.
http.server¶
Using Python’s built-in http.server module.
from http.server import BaseHTTPRequestHandler, HTTPServer
from jsonrpcserver import method, Result, Success, dispatch
@method
def ping() -> Result:
return Success("pong")
class TestHttpServer(BaseHTTPRequestHandler):
def do_POST(self):
# Process request
request = self.rfile.read(int(self.headers["Content-Length"])).decode()
response = dispatch(request)
# Return response
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(response.encode())
if __name__ == "__main__":
HTTPServer(("localhost", 5000), TestHttpServer).serve_forever()
See blog post.
Plain jsonrpcserver¶
Using jsonrpcserver’s built-in serve
method.
from jsonrpcserver import method, Result, Success, serve
@method
def ping() -> Result:
return Success("pong")
if __name__ == "__main__":
serve()
Socket.IO¶
from flask import Flask
from flask_socketio import SocketIO, send
from jsonrpcserver import method, Result, Success, dispatch
app = Flask(__name__)
socketio = SocketIO(app)
@method
def ping() -> Result:
return Success("pong")
@socketio.on("message")
def handle_message(request):
if response := dispatch(request):
send(response, json=True)
if __name__ == "__main__":
socketio.run(app, port=5000)
See blog post.
Tornado¶
from jsonrpcserver import method, Result, Success, async_dispatch
from tornado import ioloop, web
@method
async def ping() -> Result:
return Success("pong")
class MainHandler(web.RequestHandler):
async def post(self) -> None:
request = self.request.body.decode()
if response := await async_dispatch(request):
self.write(response)
app = web.Application([(r"/", MainHandler)])
if __name__ == "__main__":
app.listen(5000)
ioloop.IOLoop.current().start()
See blog post.
Websockets¶
import asyncio
from jsonrpcserver import method, Success, Result, async_dispatch
import websockets
@method
async def ping() -> Result:
return Success("pong")
async def main(websocket, path):
if response := await async_dispatch(await websocket.recv()):
await websocket.send(response)
start_server = websockets.serve(main, "localhost", 5000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
See blog post.
Werkzeug¶
from jsonrpcserver import method, Result, Success, dispatch
from werkzeug.serving import run_simple
from werkzeug.wrappers import Request, Response
@method
def ping() -> Result:
return Success("pong")
@Request.application
def application(request):
return Response(dispatch(request.data.decode()), 200, mimetype="application/json")
if __name__ == "__main__":
run_simple("localhost", 5000, application)
See blog post.
ZeroMQ¶
from jsonrpcserver import method, Result, Success, dispatch
import zmq
socket = zmq.Context().socket(zmq.REP)
@method
def ping() -> Result:
return Success("pong")
if __name__ == "__main__":
socket.bind("tcp://*:5000")
while True:
request = socket.recv().decode()
socket.send_string(dispatch(request))
See blog post.
ZeroMQ (asynchronous)¶
from jsonrpcserver import method, Result, Success, async_dispatch
import aiozmq
import asyncio
import zmq
@method
async def ping() -> Result:
return Success("pong")
async def main():
rep = await aiozmq.create_zmq_stream(zmq.REP, bind="tcp://*:5000")
while True:
request = (await rep.read())[0].decode()
if response := (await async_dispatch(request)).encode():
rep.write((response,))
if __name__ == "__main__":
asyncio.set_event_loop_policy(aiozmq.ZmqEventLoopPolicy())
asyncio.get_event_loop().run_until_complete(main())
See blog post.