使用 Thrift
ApacheThrift 最初是 Facebook 实现的一种支持多种编程语言、高效的远程服务调用框架,2008 年进入 Apache 开源项目。它采用中间语言(IDL,接口描述语言)定义 RPC 的接口和数据类型,通过一个编译器生成不同语言的代码(支持 C++、Java、Python、Ruby 等多种语言),其数据传输采用二进制格式,相对 XML 和 JSON 而言体积更小,对于高并发、大数据量和多语言的环境更有优势。
我们先安装它:
> wget http://mirrors.cnnic.cn/apache/thrift/0.9.3/thrift-0.9.3.tar.gz > tar zxf thrift-0.9.3.tar.gz > cd thrift-0.9.3 # 由于安装 RabbitMQ 的时候安装了 Erlang,可以禁用 Erlang > ./configure--without-erlang > make && sudo make install > cd lib/py > sudo make install
安装 Thrift 的 Python 库的时候提示了包的安装路径,在 server.py 一开始要指定这个包路径:
import sys sys.path.append('gen-py') sys.path.append('/usr/lib/python2.7/site-packages')
定义 IDL 文件
我们将把文件托管项目服务化。首先定义.thrift 文件(pastefile.thrift):
struct PasteFile{ 1:required i32 id, 2:required string filename, 3:required string filehash, 4:required string filemd5, 5:required string uploadtime, 6:required string mimetype, 7:required i64 size, 8:required string url_s, 9:required string url_i, 10:required list<i32>image_size, 11:required string url_d, 12:required string url_p, 13:required string size_humanize, 14:required string type, 15:required string quoteurl, } struct CreatePasteFileRequest{ 1:required string filehash, 2:required string filename, 3:required string mimetype, 4:optional i32 width, 5:optional i32 height, } exception ImageNotSupported{ 1:string message } exception UploadImageError{ 1:string message } exception NotFound{ 1:i32 code } exception ServiceUnavailable{ 1:string message } service PasteFileService{ PasteFile get(1:i32 pid) throws( 1:ServiceUnavailable service_error, 2:NotFound not_found ), list<string>get_file_info(1:string filename, 2:string mimetype) PasteFile create(1:CreatePasteFileRequest request) throws( 1:ServiceUnavailable service_error, 2:ImageNotSupported error, 3:UploadImageError image_error ), }
解析一下.thrift 文件的语法:
1.struct 关键字表示 Thrift 的结构体,概念上类似于一个 C 结构体,它将相关属性组合在一起。
2.Thrift 要求预先定义好字段和返回值类型,i32、i64、string 等都是 Thrift 内置的类型,当然也可以自定义类型。
3.list 表示有序列表。除此之外还支持 map(Python 中的字典)和 set(无序不重复元素集)。
4.exception 关键字表示 Thrift 的异常。
5.service 是 Thrift 的服务接口(类似于 Python 的方法)。其中包含三个接口:get、get_file_info 与 create,每个接口参数不同,但是需要定义参数的类型和顺序。每行定义的第一个字段表示接口返回的类型,比如 PasteFile get(1:i32 pid) 表示执行 get 接口返回一个 PasteFile 类型的对象。需要注意,不一定返回的对象都是定义的结构体,也可以是内置的类型,比如 get_file_info 接口返回的就是一个字符串的列表。
6.throws 块内列出了可能抛出的异常。
生成 Thrift 代码:
> thrift -r --gen py pastefile.thrift
生成的代码目录结构如下:
服务端实现
先引入 Thrift 相关的模块:
from thrift.transport import TTransport, TSocket from thrift.protocol import TBinaryProtocol from thrift.server import TServer from pastefile import PasteFileService from pastefile.ttypes import PasteFile, UploadImageError, NotFound
基于 PasteFile 实现了 RealPasteFile,需要重载两个方法。第一个是 get_url 方法,原来的用法是:
from flask import request def get_url(self, subtype, is_symlink=False): hash_or_link=self.symlink if is_symlink else self.filehash return 'http://{host}/{subtype}/{hash_or_link}'.format( subtype=subtype, host=request.host, hash_or_link=hash_or_link)
由于 request 来自 Flask,服务化之后不能使用 request.host,需要在响应之前再填充 host 这个参数,修改之后是这个样子:
def get_url(self, subtype, is_symlink=False): hash_or_link=self.symlink if is_symlink else self.filehash return 'http://%s/{subtype}/{hash_or_link}'.format( subtype=subtype, hash_or_link=hash_or_link)
也就是预先留了一个%s 占位,等待 API 返回之前再拼进去。
另一个要修改的是 create_by_upload_file,之前参数 uploaded_file 是一个上传的文件对象,如果做了服务化则需要把这个对象通过 client 传给 server,再由 server 保存,这相当于增加了复杂度和网络延迟,所以直接保存文件,在 create_by_upload_file 中只判断重复:
@classmethod def create_by_upload_file(cls, uploaded_file): rst=uploaded_file with open(rst.path) as f: filemd5=get_file_md5(f) uploaded_file=cls.get_by_md5(filemd5) if uploaded_file: os.remove(rst.path) return uploaded_file filestat=os.stat(rst.path) rst.size=filestat.st_size rst.filemd5=filemd5 return rst
再定义服务处理的类,这个类其实就是接口的封装:
class PasteFileHandler(object): # 这一步比较绕,使用 Flask 保存文件在 app.py 中执行,没有必要传输到服务端再保 存,需要预先生成文件路径 def get_file_info(self, filename, mimetype): rst=PasteFileModel(filename, mimetype, 0) return rst.filehash, rst.path # 方法的参数类型已经在 pastefile.thrift 中定义了,request 是一个 CreatePasteFileRequest 实例 def create(self, request): width=request.width height=request.height filehash=request.filehash filename=request.filename mimetype=request.mimetype uploaded_file=PasteFileModel.get_path(filehash) uploaded_file.filename=filename uploaded_file.mimetype=mimetype try: if width and height: paste_file=RealPasteFile.rsize(uploaded_file, width, height) else: paste_file,_=RealPasteFile.create_by_upload_file( uploaded_file, filehash) except: raise UploadImageError() db.session.add(paste_file) db.session.commit() return self.convert_type(paste_file) def get(self, pid): paste_file=PasteFileModel.query.filter_by(id=pid).first() if not paste_file: raise NotFound() # 如果不使用预先定义的异常类,抛出的异常都是 TApplicationException return self.convert_type(paste_file) @classmethod def convert_type(cls, paste_file): '''将模型转化为 Thrift 结构体的类型''' new_paste_file=PasteFile() for attr in ('id', 'filehash', 'filename', 'filemd5', 'uploadtime', 'mimetype', 'symlink', 'size', 'quoteurl', 'size', 'type', 'url_d', 'url_i', 'url_s', 'url_p'): val=getattr(paste_file, attr) if isinstance(val, unicode): # 因为需要传输字符串,所以对 unicode 要编码 val=val.encode('utf-8') # Thrift 不支持 Python 的时间格式,需要转换一下,在客户端再转换回来 if isinstance(val, datetime): val=str(val) setattr(new_paste_file, attr, val) return new_paste_file
启动服务的代码如下:
import logging logging.basicConfig() # 这一步很重要,可以收到 Thrift 发出来的异常日志 handler=PasteFileHandler() # Processor 用来从连接中读取数据,将处理授权给 handler(自己实现),最后将结果写到 连接上 processor=PasteFileService.Processor(handler) # 服务端使用 8200 端口,transport 是网络读写抽象层,为到来的连接创建传输对象 transport=TSocket.TServerSocket(port=8200) tfactory=TTransport.TBufferedTransportFactory() pfactory=TBinaryProtocol.TBinaryProtocolFactory() server=TServer.TThreadPoolServer( processor, transport, tfactory, pfactory) print 'Starting the server...' server.serve()
客户端实现
client.py 中同样先引入 Thrift 相关的定义:
from thrift.transport import TTransport, TSocket from thrift.protocol import TBinaryProtocol from pastefile import PasteFileService from pastefile.ttypes import ( PasteFile, CreatePasteFileRequest, UploadImageError, NotFound)
为了让客户端连接发生在服务器启动之后,而且能重用连接,我们使用了 LocalProxy 包装 client:
from werkzeug.local import LocalProxy def get_client(): # 同样使用 8200 端口,使用阻塞式 I/O 进行传输,是最常见的模式 transport=TSocket.TSocket('localhost', 8200) transport=TTransport.TBufferedTransport(transport) # 封装协议,使用二进制编码格式进行数据传输 protocol=TBinaryProtocol.TBinaryProtocol(transport) client=PasteFileService.Client(protocol) transport.open() # 打开连接 return client client=LocalProxy(get_client)
这样就可以在下面的逻辑中直接使用 client 了。
由于上传逻辑出现在两个视图中,所以抽象一个连接函数,让代码复用:
def create(uploaded_file, width=None, height=None): filename=uploaded_file.filename.encode('utf-8') mimetype=uploaded_file.mimetype.encode('utf-8') filehash, path=client.get_file_info(filename, mimetype) create_request=CreatePasteFileRequest() create_request.filename=filename create_request.mimetype=mimetype create_request.filehash=filehash # 接收上传文件,直接保存,没有必要传输到服务端再去保存 uploaded_file.save(path) if width is not None and height is not None: create_request.width=width create_request.height=height try: pastefile=client.create(create_request) except UploadImageError: # 异常是在 PasteFileHandler 的 create 方法中预先定义的 return{'r':1, 'error':'upload fail'} print isinstance(pastefile, PasteFile) # 只是验证 try : # 事实上没有必要重新 get 一次,因为 create 方法已经返回了 PasteFile 实例,这里 只是演示 paste_file=client.get(pastefile.id) except NotFound: return{'r':1, 'error':'not found'} return{'r':0, 'paste_file':paste_file}
在 app.py 里面引用 create 方法,并且应用于视图:
from client import create @app.route('/', methods=['GET', 'POST']) def index(): if request.method=='POST': uploaded_file=request.files['file'] w=request.form.get('w', None) h=request.form.get('h', None) if not uploaded_file: return abort(400) # 使用 Thrift 客户端代码请求服务端之后获得创建的文件对象 rs=create(uploaded_file, width=w, height=h) if rs['r']: return rs['error'] paste_file=rs['paste_file'] return jsonify({ 'url_d':paste_file.url_d%request.host, # 由于之前 get_url 的值中的主机 名使用了%s 占位,这里填充进去 'url_i':paste_file.url_i%request.host, 'url_s':paste_file.url_s%request.host, 'url_p':paste_file.url_p%request.host, 'filename':paste_file.filename, 'size':humanize_bytes(paste_file.size), 'uploadtime':paste_file.uploadtime, 'type':paste_file.type, 'quoteurl':paste_file.quoteurl.replace('%25s', request.host) # quoteurl 已 经是 url 编码后的结果,需要使用替换的方式 }) return render_template('index.html',**locals())
这样就实现了服务化:app.py 专注于视图逻辑;client.py 专注于请求服务;server.py 处理客户端发来的接口请求。实际生产环境中还可以考虑 gRPC 和 Nameko(http://bit.ly/1W3N8qK ),它们可作为实现 RPC 框架的参考。
如果你所在的生产环境只使用 Python 一种语言,不喜欢 Thrift 生成的不符合 Python 编码规范的代码,而且对 Python 实现的 RPC 情有独钟,可以选择饿了么开源的纯 Python 实现的 thriftpy(https://github.com/eleme/thriftpy )。
我们先安装它:
> pip install thriftpy cython
安装 cython 是因为 thriftpy 的二进制协议实现可以使用 cython 加速。接下来实现一个 add 功能的服务。首先看 Thrift 定义(calc.thrift):
service CalcService{ i64 add(1:i64 a, 2:i64 b), }
看一下服务端的实现(server_with_thriftpy.py):
import os import logging import thriftpy from thriftpy.rpc import make_server from thriftpy.protocol import TBinaryProtocolFactory from thriftpy.transport import TBufferedTransportFactory HERE=os.path.abspath(os.path.dirname(__file__)) logging.basicConfig() calc_thrift=thriftpy.load( os.path.join(HERE, 'calc.thrift'), module_name='calc_thrift') class Dispatcher(object): def add(self, a, b): return a+b server=make_server(calc_thrift.CalcService, Dispatcher(), '127.0.0.1', 8300, proto_factory=TBinaryProtocolFactory(), trans_factory=TBufferedTransportFactory()) print 'serving...' server.serve()
启动它:
> python chapter10/section2/server_with_thriftpy.py
再看一下客户端的实现(client_with_thriftpy.py):
import os import thriftpy from thriftpy.rpc import client_context from thriftpy.protocol import TBinaryProtocolFactory from thriftpy.transport import TBufferedTransportFactory HERE=os.path.abspath(os.path.dirname(__file__)) calc_thrift=thriftpy.load( os.path.join(HERE, 'calc.thrift'), module_name='calc_thrift') with client_context(calc_thrift.CalcService, '127.0.0.1', 8300, proto_factory=TBinaryProtocolFactory(), trans_factory=TBufferedTransportFactory(), timeout=None) as calc: rs=calc.add(1, 2) print 'Result is:{}'.format(rs)
现在执行这个客户端程序,就能获得两数相加的结果了:
> python client_with_thriftpy.py Result is: 3
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论