返回介绍

使用 Thrift

发布于 2025-04-20 18:52:17 字数 12178 浏览 0 评论 0 收藏

ApacheThrift 最初是 Facebook 实现的一种支持多种编程语言、高效的远程服务调用框架,2008 年进入 Apache 开源项目。它采用中间语言(IDL,接口描述语言)定义 RPC 的接口和数据类型,通过一个编译器生成不同语言的代码(支持 C++、Java、Python、Ruby 等多种语言),其数据传输采用二进制格式,相对 XML 和 JSON 而言体积更小,对于高并发、大数据量和多语言的环境更有优势。

我们先安装它:

> wget http://mirrors.cnnic.cn/apache/thrift/0.9.3/thrift-0.9.3.tar.gz
> tar zxf thrift-0.9.3.tar.gz
> cd thrift-0.9.3
# 由于安装 RabbitMQ 的时候安装了 Erlang,可以禁用 Erlang
> ./configure--without-erlang
> make && sudo make install
> cd lib/py
> sudo make install

安装 Thrift 的 Python 库的时候提示了包的安装路径,在 server.py 一开始要指定这个包路径:

import sys
    
sys.path.append('gen-py')
sys.path.append('/usr/lib/python2.7/site-packages')

定义 IDL 文件

我们将把文件托管项目服务化。首先定义.thrift 文件(pastefile.thrift):

struct PasteFile{
    1:required i32 id,
    2:required string filename,
    3:required string filehash,
    4:required string filemd5,
    5:required string uploadtime,
    6:required string mimetype,
    7:required i64 size,
    8:required string url_s,
    9:required string url_i,
    10:required list<i32>image_size,
    11:required string url_d,
    12:required string url_p,
    13:required string size_humanize,
    14:required string type,
    15:required string quoteurl,
}
     
struct CreatePasteFileRequest{
    1:required string filehash,
    2:required string filename,
    3:required string mimetype,
    4:optional i32 width,
    5:optional i32 height,
}
     
exception ImageNotSupported{
    1:string message
}
     
exception UploadImageError{
    1:string message
}
     
exception NotFound{
    1:i32 code
}
     
exception ServiceUnavailable{
    1:string message
}
     
service PasteFileService{
   PasteFile get(1:i32 pid)
       throws(
           1:ServiceUnavailable service_error,
           2:NotFound not_found
       ),
   list<string>get_file_info(1:string filename, 2:string mimetype)
   PasteFile create(1:CreatePasteFileRequest request)
       throws(
           1:ServiceUnavailable service_error,
           2:ImageNotSupported error,
           3:UploadImageError image_error
       ),
}

解析一下.thrift 文件的语法:

1.struct 关键字表示 Thrift 的结构体,概念上类似于一个 C 结构体,它将相关属性组合在一起。

2.Thrift 要求预先定义好字段和返回值类型,i32、i64、string 等都是 Thrift 内置的类型,当然也可以自定义类型。

3.list 表示有序列表。除此之外还支持 map(Python 中的字典)和 set(无序不重复元素集)。

4.exception 关键字表示 Thrift 的异常。

5.service 是 Thrift 的服务接口(类似于 Python 的方法)。其中包含三个接口:get、get_file_info 与 create,每个接口参数不同,但是需要定义参数的类型和顺序。每行定义的第一个字段表示接口返回的类型,比如 PasteFile get(1:i32 pid) 表示执行 get 接口返回一个 PasteFile 类型的对象。需要注意,不一定返回的对象都是定义的结构体,也可以是内置的类型,比如 get_file_info 接口返回的就是一个字符串的列表。

6.throws 块内列出了可能抛出的异常。

生成 Thrift 代码:

> thrift -r --gen py pastefile.thrift

生成的代码目录结构如下:

服务端实现

先引入 Thrift 相关的模块:

from thrift.transport import TTransport, TSocket
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer
    
from pastefile import PasteFileService
from pastefile.ttypes import PasteFile, UploadImageError, NotFound

基于 PasteFile 实现了 RealPasteFile,需要重载两个方法。第一个是 get_url 方法,原来的用法是:

from flask import request

def get_url(self, subtype, is_symlink=False):
    hash_or_link=self.symlink if is_symlink else self.filehash
    return 'http://{host}/{subtype}/{hash_or_link}'.format(
       subtype=subtype, host=request.host, hash_or_link=hash_or_link)

由于 request 来自 Flask,服务化之后不能使用 request.host,需要在响应之前再填充 host 这个参数,修改之后是这个样子:

def get_url(self, subtype, is_symlink=False):
    hash_or_link=self.symlink if is_symlink else self.filehash
    return 'http://%s/{subtype}/{hash_or_link}'.format(
         subtype=subtype, hash_or_link=hash_or_link)

也就是预先留了一个%s 占位,等待 API 返回之前再拼进去。

另一个要修改的是 create_by_upload_file,之前参数 uploaded_file 是一个上传的文件对象,如果做了服务化则需要把这个对象通过 client 传给 server,再由 server 保存,这相当于增加了复杂度和网络延迟,所以直接保存文件,在 create_by_upload_file 中只判断重复:

@classmethod
def create_by_upload_file(cls, uploaded_file):
    rst=uploaded_file
    with open(rst.path) as f:
        filemd5=get_file_md5(f)
        uploaded_file=cls.get_by_md5(filemd5)
        if uploaded_file:
            os.remove(rst.path)
            return uploaded_file
    filestat=os.stat(rst.path)
    rst.size=filestat.st_size
    rst.filemd5=filemd5
return rst

再定义服务处理的类,这个类其实就是接口的封装:

class PasteFileHandler(object):
    # 这一步比较绕,使用 Flask 保存文件在 app.py 中执行,没有必要传输到服务端再保
       存,需要预先生成文件路径
    def get_file_info(self, filename, mimetype):
        rst=PasteFileModel(filename, mimetype, 0)
        return rst.filehash, rst.path
     
    # 方法的参数类型已经在 pastefile.thrift 中定义了,request 是一个
        CreatePasteFileRequest 实例
    def create(self, request):
        width=request.width
        height=request.height
        filehash=request.filehash
        filename=request.filename
        mimetype=request.mimetype
     
        uploaded_file=PasteFileModel.get_path(filehash)
        uploaded_file.filename=filename
        uploaded_file.mimetype=mimetype
        try:
            if width and height:
                paste_file=RealPasteFile.rsize(uploaded_file, width, height)
            else:
                paste_file,_=RealPasteFile.create_by_upload_file(
                    uploaded_file, filehash)
        except:
            raise UploadImageError()
        db.session.add(paste_file)
        db.session.commit()
        return self.convert_type(paste_file)
    
def get(self, pid):
    paste_file=PasteFileModel.query.filter_by(id=pid).first()
    if not paste_file:
       raise NotFound() # 如果不使用预先定义的异常类,抛出的异常都是
           TApplicationException
    return self.convert_type(paste_file)
    
@classmethod
def convert_type(cls, paste_file):
    '''将模型转化为 Thrift 结构体的类型'''
    new_paste_file=PasteFile()
    for attr in ('id', 'filehash', 'filename', 'filemd5', 'uploadtime',
                'mimetype', 'symlink', 'size', 'quoteurl', 'size', 'type',
                'url_d', 'url_i', 'url_s', 'url_p'):
        val=getattr(paste_file, attr)
        if isinstance(val, unicode):
            # 因为需要传输字符串,所以对 unicode 要编码
            val=val.encode('utf-8')
        # Thrift 不支持 Python 的时间格式,需要转换一下,在客户端再转换回来
        if isinstance(val, datetime):
            val=str(val)
        setattr(new_paste_file, attr, val)
    return new_paste_file

启动服务的代码如下:

import logging
logging.basicConfig() # 这一步很重要,可以收到 Thrift 发出来的异常日志
handler=PasteFileHandler()
# Processor 用来从连接中读取数据,将处理授权给 handler(自己实现),最后将结果写到
    连接上
processor=PasteFileService.Processor(handler)
# 服务端使用 8200 端口,transport 是网络读写抽象层,为到来的连接创建传输对象
transport=TSocket.TServerSocket(port=8200)
tfactory=TTransport.TBufferedTransportFactory()
pfactory=TBinaryProtocol.TBinaryProtocolFactory()
    
server=TServer.TThreadPoolServer(
    processor, transport, tfactory, pfactory)
print 'Starting the server...'
server.serve()

客户端实现

client.py 中同样先引入 Thrift 相关的定义:

from thrift.transport import TTransport, TSocket
from thrift.protocol import TBinaryProtocol
    
from pastefile import PasteFileService
from pastefile.ttypes import (
    PasteFile, CreatePasteFileRequest, UploadImageError,
   NotFound)

为了让客户端连接发生在服务器启动之后,而且能重用连接,我们使用了 LocalProxy 包装 client:

from werkzeug.local import LocalProxy
    
    
def get_client():
    # 同样使用 8200 端口,使用阻塞式 I/O 进行传输,是最常见的模式
    transport=TSocket.TSocket('localhost', 8200)
    transport=TTransport.TBufferedTransport(transport)
    # 封装协议,使用二进制编码格式进行数据传输
    protocol=TBinaryProtocol.TBinaryProtocol(transport)
    client=PasteFileService.Client(protocol)
    transport.open() # 打开连接
    return client
    
client=LocalProxy(get_client)

这样就可以在下面的逻辑中直接使用 client 了。

由于上传逻辑出现在两个视图中,所以抽象一个连接函数,让代码复用:

def create(uploaded_file, width=None, height=None):
    filename=uploaded_file.filename.encode('utf-8')
    mimetype=uploaded_file.mimetype.encode('utf-8')
    filehash, path=client.get_file_info(filename, mimetype)
     
    create_request=CreatePasteFileRequest()
     
    create_request.filename=filename
    create_request.mimetype=mimetype
    create_request.filehash=filehash
     
    # 接收上传文件,直接保存,没有必要传输到服务端再去保存
    uploaded_file.save(path)
if width is not None and height is not None:
   create_request.width=width
   create_request.height=height
try:
   pastefile=client.create(create_request)
except UploadImageError: # 异常是在 PasteFileHandler 的 create 方法中预先定义的
   return{'r':1, 'error':'upload fail'}
     
print isinstance(pastefile, PasteFile) # 只是验证
     
try : # 事实上没有必要重新 get 一次,因为 create 方法已经返回了 PasteFile 实例,这里
     只是演示
     paste_file=client.get(pastefile.id)
except NotFound:
     return{'r':1, 'error':'not found'}
     
return{'r':0, 'paste_file':paste_file}

在 app.py 里面引用 create 方法,并且应用于视图:

from client import create
     
    
@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method=='POST':
        uploaded_file=request.files['file']
        w=request.form.get('w', None)
        h=request.form.get('h', None)
        if not uploaded_file:
           return abort(400)
     
        # 使用 Thrift 客户端代码请求服务端之后获得创建的文件对象
        rs=create(uploaded_file, width=w, height=h)
        if rs['r']:
           return rs['error']
     
        paste_file=rs['paste_file']
     
        return jsonify({
            'url_d':paste_file.url_d%request.host, # 由于之前 get_url 的值中的主机
                 名使用了%s 占位,这里填充进去
            'url_i':paste_file.url_i%request.host,
            'url_s':paste_file.url_s%request.host,
            'url_p':paste_file.url_p%request.host,
            'filename':paste_file.filename,
            'size':humanize_bytes(paste_file.size),
            'uploadtime':paste_file.uploadtime,
            'type':paste_file.type,
            'quoteurl':paste_file.quoteurl.replace('%25s', request.host) #  quoteurl 已
                经是 url 编码后的结果,需要使用替换的方式
    })
return render_template('index.html',**locals())

这样就实现了服务化:app.py 专注于视图逻辑;client.py 专注于请求服务;server.py 处理客户端发来的接口请求。实际生产环境中还可以考虑 gRPC 和 Nameko(http://bit.ly/1W3N8qK ),它们可作为实现 RPC 框架的参考。

如果你所在的生产环境只使用 Python 一种语言,不喜欢 Thrift 生成的不符合 Python 编码规范的代码,而且对 Python 实现的 RPC 情有独钟,可以选择饿了么开源的纯 Python 实现的 thriftpy(https://github.com/eleme/thriftpy )。

我们先安装它:

> pip install thriftpy cython

安装 cython 是因为 thriftpy 的二进制协议实现可以使用 cython 加速。接下来实现一个 add 功能的服务。首先看 Thrift 定义(calc.thrift):

service CalcService{
    i64 add(1:i64 a, 2:i64 b),
}

看一下服务端的实现(server_with_thriftpy.py):

import os
import logging
    
import thriftpy
from thriftpy.rpc import make_server
from thriftpy.protocol import TBinaryProtocolFactory
from thriftpy.transport import TBufferedTransportFactory
    
HERE=os.path.abspath(os.path.dirname(__file__))
logging.basicConfig()
    
calc_thrift=thriftpy.load(
    os.path.join(HERE, 'calc.thrift'),
    module_name='calc_thrift')
    
class Dispatcher(object):
    def add(self, a, b):
    return a+b
    
    
server=make_server(calc_thrift.CalcService,
                  Dispatcher(),
                  '127.0.0.1', 8300,
                  proto_factory=TBinaryProtocolFactory(),
                  trans_factory=TBufferedTransportFactory())
print 'serving...'
server.serve()

启动它:

> python chapter10/section2/server_with_thriftpy.py

再看一下客户端的实现(client_with_thriftpy.py):

import os
    
import thriftpy
from thriftpy.rpc import client_context
from thriftpy.protocol import TBinaryProtocolFactory
from thriftpy.transport import TBufferedTransportFactory
    
HERE=os.path.abspath(os.path.dirname(__file__))
    
calc_thrift=thriftpy.load(
    os.path.join(HERE, 'calc.thrift'),
    module_name='calc_thrift')
    
with client_context(calc_thrift.CalcService,
                    '127.0.0.1', 8300,
                    proto_factory=TBinaryProtocolFactory(),
                    trans_factory=TBufferedTransportFactory(),
                    timeout=None) as calc:
   rs=calc.add(1, 2)
   print 'Result is:{}'.format(rs)

现在执行这个客户端程序,就能获得两数相加的结果了:

> python client_with_thriftpy.py
Result is: 3

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。