1 2 3

FastAPI 初见

什么是 API?API 有什么用?本文从零开始教你搭建 API,认识 API 服务中的组件和 FastAPI 的使用细节。

GitHub项目地址:calendar-api

什么是 API?

简单来说,API 是软件间相互传输数据的接口。它在生活中十分常见,比如博物馆订票系统中就使用了 API. 当你在手机应用上订票时,手机实际上发送了一个 HTTP 请求给远程服务器。远程服务器解析该请求。当确认所有字段信息均准确无误后,它才会把你的订票信息录入数据库,并回调成功标识。只有当上述操作全都被正确执行时,你的手机才会显示订票成功。

API 程序通常运行在服务端 (server) 上。客户端 (client) 通过向 API 提供的网络接口发送请求,以实现对服务端的通信。服务端收到请求后,对请求进行解析。如果请求是合法的,则执行该请求,并将请求结果回调给客户端。一次典型的 API 请求大体上是这么个过程。

但是,在业务中,我们经常需要记录每次请求产生的中间状态、运行结果和日志信息等数据,那么就需要数据库的参与。服务端调用 数据库 (database) 以存储业务中产生的各种信息。

1. REST API

API 本身是高度个性化的,软件间可以用任意数据类型进行通信。但如果 API 是面向大众的,个性化将导致软件间沟通成本高企。这就需要有规范来约束其沟通方式。REST API 就是其中一种规范。REST API 提出了六项指导原则,只要 API 符合这六项指导原则,就能称之为“符合 REST 风格的 API”。

REST API 提出的六项指导原则分别是:

  • Client–server
  • Stateless
  • Cacheable
  • Uniform interface
  • Layered system
  • Code on demand (optional)

[了解更多]

PS: FastAPI 对构建 REST 风格的 API 提供良好的支持,这也是本文选用 FastAPI 的原因之一。

2. 安装 FastAPI

FastAPI 是 Python 下用于构建 API 的一个包。它的代码量少适合敏捷开发、服务稳定、支持异步,是目前搭建 API 的不二之选。

Let’s start!!!

使用 FastAPI 需要先安装两个包。有关安装的详细信息,参见这里

1.安装 fastapi

pip install fastapi

2.安装 uvicorn (或 hypercorn)

pip install uvicorn

FAQ:

❓ 什么是 uvicorn

Uvicorn is a lightning-fast ASGI server implementation, using uvloop and httptools. [reference]

❓ 什么是 ASGI server

ASGI (Asynchronous Server Gateway Interface) is a spiritual successor to WSGI, intended to provide a standard interface between async-capable Python web servers, frameworks, and applications.

Where WSGI provided a standard for synchronous Python apps, ASGI provides one for both asynchronous and synchronous apps, with a WSGI backwards-compatibility implementation and multiple servers and application frameworks. [reference]

3. 测试安装是否成功

安装完后,我们来搭一个超级简单的 API,来验证安装是否成功。调用此 API,它将回调一条 JSON 信息: {'key': 'value'}。下面的代码实现了这个功能。

首先,新建 main.py 文件,并在文件中写入如下内容。

# main.py
from fastapi import FastAPI

app = FastAPI()

@app.get('/')
def index():
    return {'key': 'value'}

然后,在命令行界面中,来到当前目录下,执行如下命令。

uvicorn main:app --reload

注:在此命令中,main 是脚本的名称(脚本名为 main.py),--reload 代表在每次脚本更新时重启 ASGI 服务。

最后,在浏览器中打开 http://127.0.0.1:8000,如果网页显示 {'key': 'value'} 的话,就说明我们的 API 搭建成功啦!

4. 用 FastAPI 搭建一个日程 API

为了能展开介绍 API 的组件和 FastAPI 的细节。接下来我打算写点有用的代码。

有用意味着它是围绕着某一项功能展开的。由于我的个人偏好,我决定写一个日程 API。它具有增、删、改、查等功能。

4.1 消息格式的设计

在开始动工前,我们首先要把消息格式设计好。

这就像是在搭建管道前,你得决定管道里将来流的是自来水还是石油。API 就像管道,数据就像流淌在管子里的内容物。因此在决定造什么管子之前,我们得先搞清楚我们的运输对象。

以下是我的设计方案,各字段的数据类型和含义如下:

{
    sid: str  # 日程 ID
    name: str  # 名称
    content: str  # 内容
    category: str  # 分类
    level: int  # 重要程度, 0: 未定义  1: 低  2: 中  3: 高 
    status: float  # 当前进度, 0 <= status <= 1
    creation_time: str  # 创建时间
    start_time: str  # 开始时间
    end_time: str  # 结束时间
}

其中,sid 是 schedule id 的缩写。它作为主键,是一条日程信息的唯一标识。

creation_time, start_time, end_time 被设计为 str 而非 datetime,是为了方便存储。与之相对应,为了防止用户在上述三个字段中输入不合法的值,后续服务端在读取这些字段时将会加一道校验,将不符合日期和时间规范的消息过滤出去。

4.2 数据库

设计完消息格式以后,我们来建数据库 (database)。因为依赖关系是:客户端依赖服务端,而服务端依赖数据库。因此我们的编写顺序是:数据库 –> 服务端 –> 客户端。

有很多数据库可供选择,这里我们采用 SQLite。因为它是 Python 自带的,无需安装,符合我们希望本文是“包含 FastAPI 的最小子集”的愿望。而且本文不打算搭建一个正经的 API,所以没必要采用正经的数据库,比如 MySQL.

关于在 Python3 中使用 SQLite 的介绍请看这里

以下提供了有关数据库事务的额外知识,它们是操作数据库的指导性原则。我相信事先了解它们将对搭建服务有所帮助。

Note: 数据库事务的 ACID 特性

  • 原子性 (atomicity)
  • 一致性 (consistency)
  • 隔离性 (isolation)
  • 持久性 (durability)

了解更多:[百度] [维基]

本文一共有两个数据库相关的脚本。build.py 用于新建 SQLite 数据库,它只需在第一次启动服务前运行一次;database_handler.py 是一个数据库操作类,它为我们操作数据库提供帮助。

# -*- coding:utf-8 -*-
# build.py

import configparser
import json
import database_handler


def build_bd():
    """创建数据库"""
    config = configparser.ConfigParser()
    config.read('db.conf')
    info = config['DEFAULT']

    dbh = database_handler.DatabaseHandler(db_name=info['db_name'])
    
    dbh.create_table(
        table_name=info['table_name'],
        columns=json.loads(info['columns']))


if __name__ == '__main__':
    build_bd()

# -*- coding:utf-8 -*-
# database_handler.py

import sqlite3


class DatabaseHandler:
    """database handler"""

    def __init__(self, db_name: str, check_same_thread: bool = True):
        """init"""
        self.db_name = db_name
        self.conn = sqlite3.connect(
            '{}.db'.format(db_name), check_same_thread=check_same_thread)
        self.c = self.conn.cursor()

    def execute(self, cmd: str):
        """Execute command"""
        self.c.execute(cmd)
        self.conn.commit()

    def create_table(self, table_name: str, columns: dict):
        """Create table"""
        lst = [str(k) + ' ' + str(v) for k, v in columns.items()]
        columns_str = ','.join(lst)

        cmd = 'CREATE TABLE {table_name}({columns_str})'

        self.execute(cmd.format(
            table_name=table_name,
            columns_str=columns_str))

    def insert_data(self, table_name: str, columns: dict, data: dict):
        """Insert a row of data"""
        lst = ["'" + str(v) + "'" if columns[k] == 'TEXT' else str(v)
               for k, v in data.items()]
        data_str = ','.join(lst)

        cmd = 'INSERT INTO {table_name} VALUES ({data_str})'

        self.execute(cmd.format(
            table_name=table_name,
            data_str=data_str))

    def update_data(self, table_name: str, columns: dict, data: dict, condition: dict):
        """Update data"""
        lst1 = [str(k) + '=' + "'" + str(v) + "'" if columns[k] == 'TEXT'
                else str(k) + '=' + str(v)
                for k, v in data.items()]
        value_str = ','.join(lst1)

        lst2 = [str(k) + '=' + "'" + str(v) + "'" if columns[k] == 'TEXT'
                else str(k) + '=' + str(v)
                for k, v in condition.items()]
        condition_str = ' AND '.join(lst2)

        cmd = 'UPDATE {table_name} SET {value_str} WHERE {condition_str}'

        self.execute(cmd.format(
            table_name=table_name,
            value_str=value_str,
            condition_str=condition_str))

    def delete_data(self, table_name: str, columns: dict, condition: dict):
        """Delete data"""
        lst = [str(k) + '=' + "'" + str(v) + "'" if columns[k] == 'TEXT'
               else str(k) + '=' + str(v)
               for k, v in condition.items()]
        condition_str = ' AND '.join(lst)

        cmd = 'DELETE FROM {table_name} WHERE {condition_str}'

        self.execute(cmd.format(
            table_name=table_name,
            condition_str=condition_str))

    def fetch_data(self, table_name: str, columns: dict, condition: dict):
        """Fetch data"""
        lst = [str(k) + '=' + "'" + str(v) + "'" if columns[k] == 'TEXT'
               else str(k) + '=' + str(v)
               for k, v in condition.items()]
        condition_str = ' AND '.join(lst)

        cmd = 'SELECT * FROM {table_name} WHERE {condition_str}'

        self.execute(cmd.format(
            table_name=table_name,
            condition_str=condition_str))

        return self.c.fetchall()

    def fetch_all(self, table_name: str):
        """Fetch data"""
        cmd = 'SELECT * FROM {table_name}'

        self.execute(cmd.format(
            table_name=table_name))

        return self.c.fetchall()

    def check_existence(self, table_name: str, columns: dict, condition: dict):
        """check the existence of item"""
        try:
            res = self.fetch_data(table_name, columns, condition)
            if len(res) == 0:
                return False
        except Exception:
            return False
        return True


if __name__ == '__main__':
    dbh = DatabaseHandler(db_name="CalendarDB")
    print(dbh.check_existence(
        'calendar',
        {"sid": "TEXT", "name": "TEXT", "content": "TEXT", "category": "TEXT", "level": "INTEGER", "status": "REAL", "creation_time": "TEXT", "start_time": "TEXT", "end_time": "TEXT"},
        {"sid": "22"}
    ))

4.3 服务端

服务端 (server) 程序负责两项功能:

  1. 提供 API 接口
  2. 处理用户请求

为了实现 1,服务端需要定义 post, delete, put, get 四种方法的响应方式。在 FastAPI 中,四种方法分别由四个函数定义。这四个函数的函数名可以随便取,但必须用装饰器标记该函数用于何种方法的响应。下文的 server.py 中定义了 post, delete, put, get 的响应方式。

在本文实现的日程 API 中,post, delete, put, get 四种方法又分别对应于数据库的增、删、改、查操作,这些数据库操作的操作细节在 method.py 中定义。

# -*- coding:utf-8 -*-
# method.py

import configparser
import json
import datetime


class Method:
    """API 操作方法"""

    def __init__(self, conf_file):
        """init"""
        self.config = configparser.ConfigParser()
        self.config.read(conf_file)  # 'db.conf'

        self.info = self.config['DEFAULT']
        self.columns = json.loads(self.info['columns'])

    def check_params(self, jsn):
        """检查参数值"""
        if jsn['level'] not in [0, 1, 2, 3]:
            return False

        if jsn['status'] < 0 or jsn['status'] > 1:
            return False

        try:
            lst = [
                jsn['creation_time'],
                jsn['start_time'],
                jsn['end_time']
            ]

            for t in lst:
                # 尝试解析时间
                _ = datetime.datetime.strptime(t, '%Y-%m-%d %H:%M:%S')

        except Exception:
            return False

        return True

    def get(self, dbh, schedule_id):
        return dbh.fetch_data(
            table_name=self.info['table_name'],
            columns=self.columns,
            condition={'sid': schedule_id})

    def post(self, dbh, schedule):
        # 检查item是否存在
        schedule_id = schedule.dict()['sid']
        if dbh.check_existence(self.info['table_name'], self.columns, {'sid': schedule_id}):
            # 如果存在
            return False

        # 检查参数值是否符合规范
        if not self.check_params(schedule.dict()):
            return False

        dbh.insert_data(
            table_name=self.info['table_name'],
            columns=self.columns,
            data=schedule.dict())

        return True

    def update(self, dbh, schedule_id, schedule):
        # 检查item是否存在
        if not dbh.check_existence(self.info['table_name'], self.columns, {'sid': schedule_id}):
            # 如果不存在
            return False

        # 检查参数值是否符合规范
        if not self.check_params(schedule.dict()):
            return False

        dbh.update_data(
            table_name=self.info['table_name'],
            columns=self.columns,
            data=schedule.dict(),
            condition={'sid': schedule_id})

        return True

    def delete(self, dbh, schedule_id):
        # 检查item是否存在
        if not dbh.check_existence(self.info['table_name'], self.columns, {'sid': schedule_id}):
            # 如果不存在
            return False

        dbh.delete_data(
            table_name=self.info['table_name'],
            columns=self.columns,
            condition={'sid': schedule_id})

        return True


if __name__ == '__main__':
    pass

# -*- coding:utf-8 -*-
# server.py

from fastapi import FastAPI
from pydantic import BaseModel
import configparser
import json

import database_handler
import method


app = FastAPI()


config = configparser.ConfigParser()
config.read('db.conf')
info = config['DEFAULT']

dbh = database_handler.DatabaseHandler(
    db_name=info['db_name'],
    check_same_thread=False)
m = method.Method(conf_file='db.conf')


class Schedule(BaseModel):
    sid: str  # ID
    name: str  # 名称
    content: str  # 内容
    category: str  # 分类
    level: int  # 重要程度, 0: 未定义  1: 低  2: 中  3: 高
    status: float  # 当前进度, 0 <= status <= 1
    creation_time: str  # 创建时间
    start_time: str  # 开始时间
    end_time: str  # 结束时间


@app.get('/')
def index():
    return {'app_name': 'calendar'}


@app.get('/schedules')
def get_schedules():
    return dbh.fetch_all(
        table_name=info['table_name'])


@app.get('/schedules/{schedule_id}')
def get_schedule(schedule_id: str):
    return m.get(dbh, schedule_id)


@app.post('/schedules')
def create_schedule(schedule: Schedule):
    if m.post(dbh, schedule):
        return schedule
    else:
        return {"errno": "1"}


@app.put('/schedules/{schedule_id}')
def update_schedule(schedule_id: str, schedule: Schedule):
    if m.update(dbh, schedule_id, schedule):
        return schedule
    else:
        return {"errno": "2"}


@app.delete('/schedules/{schedule_id}')
def delete_schedule(schedule_id: str):
    if m.delete(dbh, schedule_id):
        return {"msg": "success"}
    else:
        return {"errno": "3"}

4.4 客户端

客户端负责发送 HTTP 请求。HTTP 请求包含多种 HTTP 方法,其中 get, post, put, delete 四种方法最为常用。

Note: HTTP methods

  • GET: Use GET requests to retrieve resource representation/information only – and not to modify it in any way.
  • POST: Use POST APIs to create new subordinate resources.
  • PUT: Use PUT APIs primarily to update existing resource.
  • DELETE: As the name applies, DELETE APIs are used to delete resources.
  • PATCH: HTTP PATCH requests are to make partial update on a resource.

[了解更多]

客户端的职责就是用正确的 HTTP 方法,向服务端发送格式正确的请求信息。然后待服务端处理完成后,解析回调信息,以确认请求是否被成功执行。

使用 HTTP 协议与服务端通信需要用到 requests.

关于在 Python3 中使用 requests 的介绍请看这里

用于构建客户端的脚本仅有一个:client.py。调试 client.py 时,请务必确保操作顺序符合逻辑。如果你要删除一条数据,那么必须先添加它;如果你要更新/删除一条数据,那么它必须已经存在于数据库中。另外,请求 JSON 的格式同时在 db.conf 文件和 Schedule 类中定义。如果你需要修改消息格式,请修改这两处地方。

PS: 如果在调试 client.py 时遭遇障碍,可以先在 FastAPI 提供的单页应用 http://127.0.0.1:8000/docs 上进行调试。

# -*- coding:utf-8 -*-
# client.py

"""客户端
    Author: [email protected]
    Date: 2020-11-15
    Usage: python client.py
"""

import requests
import json


class Interface:
    """client interface"""

    def __init__(self, url, app_name):
        self.url = url
        self.app_url = url + '/' + app_name

    def get(self, url, sid):
        """get schedule"""
        url = url + '/' + sid
        return requests.get(url)

    def get_all(self, url, payload=None):
        """get schedules"""
        return requests.get(url, params=payload)

    def post(self, url, data):
        """post schedule"""
        return requests.post(url, data=json.dumps(data))

    def update(self, url, data):
        """update schedule"""
        sid = data['sid']
        url = url + '/' + sid
        return requests.put(url, data=json.dumps(data))

    def delete(self, url, sid):
        """delete schedule"""
        url = url + '/' + sid
        return requests.delete(url)


if __name__ == '__main__':
    url = "http://127.0.0.1:8000"
    i = Interface(url, 'schedules')

5. 日程 API 的食用方法

  1. 从 GitHub 上克隆本项目的代码仓库
  2. 安装 FastAPI 的依赖包,参考本文第 1 节
  3. 运行 python build.py 以创建 SQLite 数据库(仅需第一次使用时执行)
  4. 运行 uvicorn server:app --reload 以启用 API 服务
  5. 运行 client.py 中的测试代码,或在 http://127.0.0.1:8000/docs 中进行调试