如何在 Python 中使用 MongoDB 与 MongoEngine

一、介绍

MongoDB是一个免费的开源NoSQL数据库程序。它将数据存储在文档集合中,其中文档类似于传统关系数据库系统中的一行。它没有用于在集合中存储数据的固定架构;您可以将数据存储在键值对中,就像 Python 字典一样。灵活的数据模型和对水平扩展的支持等功能允许您在需求更改时进行更改或缩放数据库。

MongoEngine是一个对象文档映射器(ODM),它允许你的Python应用程序与MongoDB数据库进行交互。它提供了一个声明性 API,您可以在其中使用 Python 类和对象与数据库中的文档进行交互。这个抽象层使开发人员更容易与MongoDB数据库进行交互,有助于减少开发时间,并使数据库逻辑不易出错。

本文介绍 MongoEngine ODM 的安装和配置,定义文档架构、执行创建、读取、更新和删除 (CRUD) 操作以及从数据库中查询筛选数据的步骤。它还介绍了文档引用、迁移和元选项的基础知识。

二、准备工作

  • 在 Vultr 部署 Ubuntu 20.04 实例。
  • 创建具有 sudo 权限的非 root 用户。
  • 以非根用户身份登录您的实例。
  • 在实例上安装 MongoDB 数据库服务器。

三、设置环境

安装 MongoEngine 库。

$ pip install mongoengine

设置新的项目目录。

$ mkdir ~/mongoengine_demo

切换到项目目录。

$ cd ~/mongoengine_demo

 四、连接到数据库

您可以使用 MongoEngine 模块中的函数与 MongoDB 服务器建立连接。必须将主机名、端口、数据库等值作为参数传递。connect()

使用文本编辑器创建一个名为 的新 Python 文件。app.py

$ nano app.py

将以下内容添加到文件中,并使用 + 然后保存文件。CTRLXENTER

from mongoengine import *

client = connect('mongoengine_demo')

上面的代码使用默认主机和端口与MongoDB服务器建立连接。作为参数传递的字符串引用数据库的名称。如果数据库不存在,它会尝试使用传递给函数的名称创建新数据库。127.0.0.127017connect()

以下示例演示如何使用非默认值连接到 MongoDB 服务器。

connect('database_name', username='username', password='password', authentication_source='admin')

还可以使用该参数与多个数据库或数据库服务器建立连接。有关详细信息,请参阅 MongoEngine 文档中的多个数据库部分。alias

五、定义文档架构

存储在MongoDB数据库中的文档没有任何固定的模式。但是,定义文档架构可确保数据结构和验证,从而使数据库逻辑不易出错。

MongoEngine ODM允许您创建固定或动态文档架构。您可以从 MongoEngine 模块继承 or 类以创建新的文档类并使用字段对象定义每个字段,请参阅 MongoEngine 文档中的字段部分以查找所有可用的字段类型。DocumentDynamicDocument

编辑在上一节中创建的 Python 文件。

$ nano app.py

将以下内容添加到文件中,并使用 + 然后保存文件。CTRLXENTER

class User(Document):
    name = StringField(required=True)
    email = EmailField(required=True)
    age = IntField(required=True)

    def __repr__(self):
        return f'<User name="{self.name}">'

上面的代码创建了一个名为继承自 MongoEngine 模块中的类的文档类。它将 Python 类映射到数据库中的集合。默认情况下,它使用转换为蛇大小写的 Python 类名作为集合名称。此文档类使用固定架构,其中对象只能包含名称、电子邮件和期限。UserDocument

六、执行 CRUD 操作

MongoEngine 将每个文档类映射到数据库中的集合。创建文档类的新实例以在集合中创建新文档。本节介绍如何使用上一节中创建的文档类执行 CRUD 操作。

进入Python控制台。

$ python

导入所需的模块。

>>> from app import User

上面的命令从您在上一节中创建的 Python 文件中导入类。User

创建新条目。

>>> user1 = User(name='Example User', email=f'user@example.com', age=21)
>>> user1.save()

上面的命令从类创建一个新对象并调用该方法,该方法启动集合并在数据库中创建新文档。Usersave()

阅读第一个条目。

>>> User.objects.first()

上面的命令返回一个字典,其中包含存储在集合中的第一个文档的值。user

更新条目。

>>> user1.age = 22
>>> user1.save()

上面的命令更改对象中 age 属性的值并调用该方法,该方法将更新数据库中的文档。user1save()

验证更改。

>>> User.objects.first().age

删除条目。

>>> user1.delete()

验证更改。

>>> User.objects

退出 Python 控制台。

>>> exit()

 七、查询筛选后的数据

文档类具有允许访问存储在集合中的对象的属性。该属性是一个接受条件并返回包含筛选的文档对象的对象。本节介绍使用文档类查询筛选数据的基础知识。objectsobjectsQuerySetManagerQuerySet

进入Python控制台。

$ python

导入所需的模块。

>>> from app import User

填充数据库。

>>> user_objects = [User(name=f'Person {i}', email=f'person{i}@example.com', age=i+18) for i in range(10)]
>>> User.objects.insert(user_objects)

上述命令使用枚举值创建 10 个不同对象的列表,并将其插入到数据库中。User

使用单个条件查询文档。

>>> User.objects(age__gt=20)

上面的命令返回年龄超过 20 的文档列表。

输出

[<User name="Person 3">, <User name="Person 4">, <User name="Person 5">, <User name="Person 6">, <User name="Person 7">, <User name="Person 8">, <User name="Person 9">]

使用多个条件查询文档。

>>> User.objects(age__gt=20, age__lt=25)

上面的命令返回年龄大于 20 且小于 25 的文档列表。

输出

[<User name="Person 3">, <User name="Person 4">, <User name="Person 5">, <User name="Person 6">]

获取单个文档。

>>> User.objects(age__=19).first()

上面的命令返回一个生存期为 19 的文档对象。该方法返回单个文档对象,而不是包含文档对象的列表。first()

输出

<User name="Person 1">

退出 Python 控制台。

>>> exit()

请参阅 MongoEngine 文档中的查询数据库,以查找 中的所有可用过滤选项。QuerySetManager

 八、文档参考

MongoEngine ODM允许链接到文档架构中的其他文档。它使您能够在文档之间创建关系;与传统的关系数据库系统一样,它们将链接行的主键存储为外键。

您可以使用文档架构中的字段对象链接到其他文档。MongoEngine ODM 还支持 和 字段对象的组合以形成多对一关系。ReferenceFieldListFieldReferenceField

本节介绍如何使用文档类中的字段对象链接到不同的文档。ReferenceField

编辑 Python 文件。

$ nano app.py

将以下内容添加到 User 文档类上方的文件中,并使用 + 然后保存文件。CTRLXENTER

class Video(Document):
    title = StringField(required=True)

    def __repr__(self):
        return f'<Video title="{self.title}">'

class Course(Document):
    name = StringField(required=True)
    price = IntField(required=True)
    videos = ListField(ReferenceField(Video))

    def __repr__(self):
        return f'<Course name="{self.name}">'

上面的代码创建 Python 类,并从在数据库中创建两个新集合的类继承。文档类使用 and 字段对象在 videos 属性中存储视频对象引用的列表。VideoCourseDocumentCourseReferenceFieldListField

进入 Python 控制台。

$ python

导入所需的模块。

>>> from app import Course, Video

上面的命令从文件中导入 and 类。CourseVideoapp.py

视频集合中创建新条目。

>>> video1 = Video(title='Example Video 1').save()
>>> video2 = Video(title='Example Video 2').save()

上述命令创建文档类的两个新实例,该实例在数据库中启动并创建两个新文档。Video

课程集合中创建新条目。

>>> course1 = Course(name='Example Course 1', price=100, videos=[video1, video2]).save()

上面的命令创建文档类的新实例,该实例初始化并在数据库中创建新文档。它将文档与视频集合中的其他两个文档链接。Course

验证更改。

>>> Course.objects.first().videos
>>> Course.objects.first().videos[0].title
>>> Course.objects.first().videos[1].title

退出 Python 控制台。

>>> exit()

 九、文档迁移

NoSQL 数据库的灵活性使您可以轻松地进行文档架构更改。本节介绍如何在文档架构中进行结构更改。

编辑 Python 文件

$ nano app.py

将以下内容添加到文件中,并使用 + 然后保存文件。CTRLXENTER

class User(Document):
    name = StringField(required=True)
    email = EmailField(required=True)
    age = IntField(required=True)
    enrolled_courses = ListField(ReferenceField(Course))

    def __repr__(self):
        return f'<User name="{self.name}">'

上面的代码修改在用户集合中添加了一个名为 enrolled_courses 的新属性。和字段对象的组合允许用户文档引用多个课程文档。ListFieldReferenceField

进入Python控制台。

$ python

导入所需的模块。

>>> from app import User, Course

上面的命令从文件中导入 and 类。UserCourseapp.py

获取文档对象。

>>> user1 = User.objects.first()
>>> course1 = Course.objects.first()

上述命令从用户课程集合中获取第一个文档对象。

更新用户文档对象。

>>> user1.enrolled_courses = [course1]
>>> user1.save()

上述命令将 enrolled_courses 属性的值设置为包含文档引用对象的 Python 列表。Course

验证更改。

>>> User.objects.first().enrolled_courses
>>> User.objects.first().enrolled_courses[0].name

退出 Python 控制台。

>>> exit()

文档架构中的更改不会影响现有文档。如果要将更改应用于所有文档,则必须对集合使用该方法。有关更多信息,请参阅 MongoEngine 文档中的文档迁移部分。update_many()

十、文档元选项

文档类中的字典允许您向集合添加元数据,例如集合名称、文档索引列表、默认排序和继承选项、分片键等。本节介绍字典在文档类中的用法。metameta

编辑 Python 文件

$ nano app.py

将以下内容添加到 User 文档类下面的文件中,并使用 + then 保存文件。CTRLXENTER

class MetaExample(Document):
    age = IntField()

    meta = {
        'collection': 'example_collection',
        'indexes': ['age']
    }

    def __repr__(self):
        return f'<MetaExample age={self.age}>'

上面的代码创建了一个名为从该类继承的新 Python 类。它会创建一个名为 example_collection 的新集合,并在集合索引中添加年龄字段。MetaExampleDocument

索引是一种特殊的数据结构,它存储存储在数据库中的数据子集,使数据更易于以编程方式横向。如果没有索引,服务器必须执行完全集合扫描。如果查询存在适当的索引,则服务器可以限制必须扫描的文档数。

进入Python控制台。

$ python

导入所需的模块。

>>> from app import client, MetaExample

上面的命令从文件中导入对象和类。clientMetaExampleapp.py

初始化集合。

>>> MetaExample(age=999).save()

验证自定义集合名称。

>>> client.get_database('mongoengine_demo').list_collection_names()

验证集合索引。

>>> MetaExample.list_indexes()

退出 Python 控制台。

>>> exit()

请参阅 MongoEngine 文档中的文档集合部分,以查找可用元选项的完整列表。

十一、结论

您安装了 MongoEngine 库,与 MongoDB 数据库服务器建立了连接,定义了文档架构,执行了 CRUD 操作,并从数据库中查询了过滤的数据。您还探索了文档引用、迁移和元选项的基础知识。

如何在 Rocky Linux 8 上安装 Apache Cassandra

如果您正在寻找一个功能强大的开源分布式数据库系统,但又不想花费数小时来配置和安装它,那么 Cassandra 是适合您的解决方案。

Cassandra 是一个功能强大的开源分布式数据库系统,旨在处理许多商用服务器上的大量数据,提供高可用性,没有单点故障。相反,数据使用键值存储进行分区并分布在多个服务器上。

一、准备工作

  • 部署新的Rocky Linux 实例
  • 使用 SSH 登录服务器
  • 更新您的Rocky Linux 8 服务器
  • 创建具有 sudo 访问权限的非 root 用户

二、安装 Java OpenJDK

首先,要在系统上运行 Cassandra,必须安装 Java OpenJDK。OpenJDK 是 Java 平台的免费开源实现。下面列出了安装说明。

  1. 运行命令以安装包。在撰写本文时,Java OpenJDK 的最新版本 1.8.xxx。Java OpenJDK 的安装可能需要一些时间才能完成。dnf installjava-1.8.0-openjdk
    $ sudo dnf install java-1.8.0-openjdk -y
  2. 安装完成后,使用以下命令验证系统中已安装的 Java OpenJDK 版本。java -version
    $ java -version
    
    openjdk version "1.8.0_322"
    
    OpenJDK Runtime Environment (build 1.8.0_322-b06)
    
    OpenJDK 64-Bit Server VM (build 25.322-b06, mixed mode)

 三、安装 python

Cassandra 是用 Java 编写的,但是你也需要安装 python,因为cqlsh工具是用 python 编写的。Cqlsh 是 Cassandra 的命令行界面;你需要安装 python 来运行 Cassandra。

  1. 运行命令以在系统中安装软件包。在撰写本文时,python的最新版本是3.6.8。dnf installpython36
    & sudo dnf install python36 -y
  2. 安装完成后,使用命令验证系统中已安装的版本。python3 --version
    $ python3 --version
    
    Python 3.6.8
  3. 运行以下命令以选择默认的 Python 解释器。Cassandra 需要 python v3 或更高版本。因此,您应该从列表中选择最新的一个,并且至少是v3。选择与最新 python 版本对应的数字,然后按 Enter 键。在此演示中,它是选项 2。alternatives --config
    $ alternatives --config python
    
    1           /usr/libexec/no-python
    
    2           /usr/bin/python3

 四、安装 Apache Cassandra

现在,您的系统中已安装了所需的组件,并且可以安装 Apache Cassandra 了。

基本的 Rocky Linux 存储库没有 Cassandra 软件包,因此您需要先将其存储库添加到您的系统中。

  1. 使用文本编辑器创建一个在 /etc/yum.repos.d 目录下命名的新文件。cassandra.reponano
    sudo nano /etc/yum.repos.d/cassandra.repo
  2. 使用以下内容填充文件。指定 RPM 所在的位置()。这里的意思是从这个位置下载最新版本的Apache Cassandra 4.0.3。您始终可以选择其官方存储库中可用的最新版本。cassandra.repobaseurlhttps://downloads.apache.org/cassandra/redhat/40x/40x
    [cassandra]
    
    name=Apache Cassandra
    
    baseurl=https://downloads.apache.org/cassandra/redhat/40x/
    
    gpgcheck=1
    
    repo_gpgcheck=1
    
    gpgkey=https://downloads.apache.org/cassandra/KEYS
  3. 保存并退出文件,方法是按 Ctrl+O、Enter 和 Ctrl+X。运行该命令以使用新添加的存储库更新系统的包管理索引。dnf update
    $ sudo dnf update -y
  4. 运行 Cassandra 命令以检查新存储库是否已正确设置。您将看到新的 Cassandra 存储库在输出中启用。dnf repolist
    $ sudo  dnf repolist cassandra
    
    repo id                          repo name                               status
    
    cassandra                        Apache Cassandra                        enabled
  5. 最后,使用命令安装包。cassandradnf install
    & sudo dnf install cassandra -y
  6. 启动 Cassandra 服务。
    sudo service cassandra start
    
    Reloading systemd:                                         [  OK  ]
    
    Starting cassandra (via systemctl):                        [  OK  ]
  7. 启用 Cassandra 服务以在系统重新启动时启动。
    $ sudo systemctl enable cassandra               
    
    cassandra.service is not a native service, redirecting to systemd-sysv-install.
    
    Executing: /usr/lib/systemd/systemd-sysv-install enable cassandra
  8. 检查 Cassandra 的服务状态。
    sudo service cassandra status
    
    
    
    cassandra.service - LSB: distributed storage system for structured data
    
    
    
    Loaded: loaded (/etc/rc.d/init.d/cassandra; generated)
    
    
    
    Active: active (running) since Mon 2022-02-21 23:35:06 UTC; 4min 44s ago        

 五、保护 Cassandra

保护您的 Cassandra 集群与安装它一样重要。假设您在同一网络上有多个 Cassandra 节点。您应该在开始时保护集群,以防止攻击者访问您的数据库。

  1. 运行以下命令以创建名为 Cassandra 的新防火墙区域。您应该创建一个新区域以与 Cassandra 集群关联,以防止与系统中的其他服务发生冲突。该标志定义新防火墙是永久性的。该标志定义新的防火墙区域。firewall-cmdcassandra-cluster--permanent--new-zone
    $ sudo firewall-cmd --permanent --new-zone cassandra-cluster
    
    success
  2. 重新加载服务。firewalld
    $ sudo firewall-cmd --reload
  3. 将服务器网络 CIDR 添加到新区域中,以便客户端和服务器可以相互通信。
    $ sudo firewall-cmd --zone=cassandra-cluster --add-source=your-CIDR-here/24 --permanent
  4. 运行以下命令以允许访问新区域中 Cassandra 的默认端口。cassandra-cluster
    sudo firewall-cmd --zone=cassandra-cluster --add-port=7000/tcp --permanent
    
    sudo firewall-cmd --zone=cassandra-cluster --add-port=9042/tcp --permanent
  5. 最后,重新加载规则。此时,您的 Cassandra 集群是安全的,只能从您的 CIDR-here/24 访问。firewalld
    sudo firewall-cmd --reload

  六、测试 Cassandra

现在你已经有了一个新的 Cassandra 集群,你可以测试它是否启动并正常运行。

  1. 运行该命令以检查您的 Cassandra 集群状态。该命令将返回所有节点的信息,包括 IP 地址、每个节点的平均负载、数据中心名称、版本以及集群中每个节点的健康统计信息。nodetool status
    $ sudo nodetool status 
    
    Datacenter: datacenter1
    
    =======================
    
    Status=Up/Down
    
    |/ State=Normal/Leaving/Joining/Moving
    
    --  Address    Load       Tokens  Owns (effective)  Host ID                               Rack
    
    UN  127.0.0.1  97.22 KiB  16      100.0%            5607b149-a79e-4e3e-8d98-5b4a26ff698f  rack1
    • U 表示节点已启动。您可以通过查看输出中相应的 U 或 D 来查看哪些节点处于启动或关闭状态。
    • N 表示节点正常。
    • 地址显示节点的 IP 地址。
    • 主机 ID 是每个节点的唯一标识符。
  2. 您还可以使用该命令与 Cassandra 集群进行交互。cqlsh
    $ cqlsh
    
    Connected to Test Cluster at 127.0.0.1:9042
    
    [cqlsh 6.0.0 | Cassandra 4.0.3 | CQL spec 3.4.5 | Native protocol v5]
    
    Use HELP for help.
    
    cqlsh> 
  3. 例如,如果要更改默认名称群集(测试群集),可以使用命令进行更改。替换为所需的值。updateVultr Cluster
    $ UPDATE system.local SET cluster_name = 'Vultr Cluster' WHERE KEY = 'local';
  4. 退出 cqlsh shell。
    $ quit

下次使用该命令时,它将使用新的集群名称。此输出确认您已在系统上成功安装 Cassandra。cqlsh

    $ cqlsh

    Connected to Vultr Cluster at 127.0.0.1:9042

    [cqlsh 6.0.0 | Cassandra 4.0.3 | CQL spec 3.4.5 | Native protocol v5]

    Use HELP for help.

    cqlsh>

使用 Celery 在 Python 中执行异步任务队列

一、介绍

Celery 是基于异步消息传递的任务队列/作业队列。它可以用作应用程序的后台任务处理器,您可以在其中转储任务以在后台或任何给定时刻执行。它可以配置为同步或异步执行任务。

异步消息传递是一个系统,其中消息存储在发送方和接收方之间。这允许发送者在发送消息后立即继续处理其他事情,也可以在接收者采取任何操作之前堆叠多条消息。

Celery 的主要因素:

  • 任务
  • 任务代理
  • 结果后端
  • 程序

任务是您要发送给 Celery 以在其工作线程中执行的功能。这些是普通的 Python 函数,带有装饰器将它们与其他函数分开。

任务代理是消息传递系统,您将使用它与 Celery 通信以发送任务以供执行。Celery 支持许多代理,如 Redis、RabbitMQ、Amazon SQS 等。

结果后端再次是一个消息传递系统,但在这种情况下,Celery 使用它来存储任务执行结果,然后可以从您的应用程序访问该结果。它支持许多结果后端,如Redis,RabbitMQ(AMQP),SQLAlchemy等。

worker 是不言自明的,它是一个 Celery 进程,在后台运行,等待任务到达任务代理,通常多个工作线程一起运行以实现并发执行。

二、Celery 中的任务生命周期

Celery 执行任务的过程可以分解为:

  • 任务注册
  • 任务执行
  • 结果存储

您的应用程序将任务发送到任务代理,然后由工作线程保留执行,最后任务执行的结果存储在结果后端中。

三、Celery 的应用

了解Celery 可能有点忙,不知道它的正确应用。您可以通过许多不同的方式将其集成到应用程序中。

Celery 最常见的应用:

  • 定期执行 – 需要在间隔后定期运行的任务,例如发送每月新闻稿。
  • 第三方执行 – 与第三方服务交互的任务,例如通过 SMTP 发送电子邮件。
  • 长时间执行 – 需要很长时间才能完成执行的任务,例如压缩文件。

四、使用 Celery 创建您的第一个程序

在本节中,您将学习如何将 Celery 任务集成到 Python 程序中。

4.1、准备

要完成本指南,您将需要:

  • Python 3.7 或更高版本
  • Redis Server,按照 Ubuntu 20.04 上的安装和配置 Redis 进行操作

4.2、安装 Celery

Celery 是用 Python 编写的,可以使用 Python 的软件包安装程序(pip)进行安装。

安装最新版本的 Celery :

pip install celery

安装所需的依赖项以将 Redis 与 Celery 一起使用:

pip install celery[redis]

 4.3、编写你的第一个 Celery 任务

在这里,我们分解了一个非常基本的程序,演示了如何编写现有函数或将现有函数转换为 Celery 任务。您可以复制并粘贴最后提到的最终代码来自己测试。

导入和初始化 Celery 对象

从 celery python 包导入类并将其初始化为任何变量,这里我们使用了该变量。传递给类的第一个参数是我们应用程序的名称。Celeryapp

from celery import Celery



app = Celery(

    'tasks',

    broker="redis://localhost:6379/0",

    backend="redis://localhost:6379/0"

)

由于我们将 Redis 用于我们的代理和后端,因此我们使用关键字参数指定了它。如果不使用 Redis 的默认配置,则可以使用以下格式为环境编写连接字符串。

redis://username:password@hostname:port/db

创建基本 Celery 任务

您可以使用装饰器将任何函数转换为 Celery 任务,该装饰器添加了我们现有函数作为任务运行所需的所有必要功能。@app.task

@app.task

def multiply(a, b):

    import time

    time.sleep(5)

    return a * b

最终代码

您可以将最终代码复制并粘贴到一个名为新文件中,以按照下一节中给出的说明进行操作。tasks.py

from celery import Celery



app = Celery(

    'tasks',

    broker="redis://localhost:6379/0",

    backend="redis://localhost:6379/0"

)



@app.task

def multiply(a, b):

   import time

   time.sleep(5)

   return a * b

4.4、管理 Celery

写完任务后,您需要工作人员在您执行任务时处理任务。

启动 Celery 

通过运行给定的命令来启动 Celery 工作线程,以按照下一节中给出的说明进行操作。确保您位于保存的同一目录中tasks.py

启动 Celery :

celery -A tasks worker -n worker1 -P prefork -l INFO

预期输出:

Start Worker Output

使用的参数:

  • -A的缩写,用于指定工作人员将使用的应用程序。--app
  • -n是它的缩写,用于指定工作人员的名称。--hostname
  • -P是用于指定池类型的缩写,下面将讨论工作器类型。--pool
  • -l是它的缩写,用于指定我们工作人员的日志级别。--loglevel

您还可以使用 的缩写在后台运行工作线程。-D--detach

停止 Celery

处理完所有任务后,您可以通过运行给定的命令手动关闭工作线程。

结束运行的 Celery :

ps auxww | awk '/celery(.*)worker/ {print $2}' | xargs kill -9

4.5、在 Celery 程序中执行任务

现在,工作线程已启动并准备好处理队列,请打开新控制台并执行命令以打开 python 控制台。确保您位于保存的同一目录中pythontasks.py

导入任务:

from tasks import multiply

执行任务:

使用函数将任务执行请求发送到消息代理。.delay()

task1 = mutliply.delay(512, 100)

检查任务状态:

用于检查任务的当前状态。.state

task1.state

获取任务执行结果:

用于获取任务执行结果。.get()

task1.get()

预期输出:

Python Console Output

您可以使用演示的步骤将 Celery 集成到应用程序的工作流中。

 五、程序类型

选择正确的程序类型非常重要,因为它对执行时间和效率有重大影响。本文的这一部分将指导您了解 Celery 中可用的各种类型的工作人员。

 Celery 的类型:

  • 独奏
  • 预分叉
  • 事件小品
  • 格万特

顾名思义,Solo 是一个内联池,这意味着任务不会同时处理。它只创建一个线程,并使用该线程执行任务。

用单人池开始 Celery 程序

celery -A tasks worker --pool=solo --loglevel=info

需要逐个运行的任务的理想选择。需要放弃并发并使用独奏池的用例并不多。

Prefork 池使用 Python 内置的多处理库,它可以同时处理多个任务。线程数可以用标志调整。--concurrency

使用预叉池启动 Celery 程序

celery -A tasks worker --pool=prefork --concurrency=4 --loglevel=info

如果您的任务受 CPU 限制,则是一个理想的选择。如果任务的大部分时间都使用 CPU,则称为 CPU 受限,并且只有在 CPU 更快的情况下才能运行得更快。

CPU 绑定任务的示例:文件转换、压缩、搜索算法等。

Eventlet & Gevent pool 使用协程(也称为绿色线程)执行任务,而不是生成传统线程。它可以同时处理多个任务。可以使用 flag 调整协程的数量。--concurrency

使用事件池启动 Celery 程序

celery -A tasks worker --pool=eventlet --concurrency=500 --loglevel=info

使用 Gevent 池启动 Celery 程序

celery -A tasks worker --pool=gevent --concurrency=500 --loglevel=info

I/O 绑定任务的理想选择。当主要瓶颈是等待 I/O 操作完成的时间时,任务称为 I/O 绑定。您可以将并发数设置得很高,因为这不受可用 CPU 数量的限制,这与预分叉不同。

I/O 绑定任务的示例:发送电子邮件、发出 API 请求等。

注意:eventlet 和 gevent 不是 Python 标准库的一部分,您必须通过运行或pip install celery[eventlet]pip install celery[gevent]

六、结论

您可以使用给定的信息开始将 Celery 集成到您的应用程序中,但这还不是全部,Celery 可以实现更多。大多数 SaaS(软件即服务)Web 应用程序使用 Celery 作为后台任务处理器来执行各种操作。

如何在 Ubuntu 20.04 上使用 Python 实现 PostgreSQL 数据库事务

一、介绍

数据库事务是实现业务逻辑的 SQL 命令链。例如,在电子商务应用程序中,填写客户订单所需的 SQL 命令可能会影响 、 和 表。数据库事务解决了原子性原则,该原则指出事务在数据库中应具有全有或全无影响。如果事务中的任何 SQL 命令失败,数据库应删除(回滚)整个事务。PostgreSQL 是最受欢迎的数据库服务器之一,它支持事务以消除部分数据库更新的可能性。sales_orderssales_order_productssales_payments

本指南向您展示如何使用 PostgreSQL 实现 PostgreSQL 事务,这是一个用于连接到 PostgreSQL 服务器的高级 Python 库。psycopg2

二、准备工作

要完成本指南,请执行以下操作:

  • 部署 Ubuntu 20.04 服务器。
  • 创建一个非根 sudo 用户。
  • 预配托管的 PostgreSQL 数据库群集。
  • 找到 PostgreSQL 数据库集群的连接详细信息,位于“概述”选项卡下。本指南使用以下示例连接详细信息:
    • 用户名vultradmin
    • 密码EXAMPLE_POSTGRESQL_PASSWORD
    • 主持人SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com
    • 端口16751

三、设置示例数据库

此示例数据库是存储客户及其贷款余额的银行应用程序的后端。

此应用程序使用两个表来完成数据库事务。该表存储客户的名称。然后,该表存储客户的贷款余额。稍后,本指南将介绍如何使用 Linux 命令将示例事务发送到应用程序。应用程序必须将事务作为单个工作单元完成,才能完成业务逻辑。否则,数据库应拒绝部分完成的事务。customersloanscurl

若要设置此示例应用程序,需要包连接到托管 PostgreSQL 数据库群集并创建数据库。按照以下步骤安装软件包并初始化数据库:postgresql-client

  1. 更新服务器的软件包信息索引。
    $ sudo apt update 
  2. 使用该工具安装包。aptpostgresql-client
    $ sudo apt install -y postgresql-client
  3. 登录到托管的 PostgreSQL 数据库集群。替换为适用于您的数据库的正确版本。SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.comhost
    $ psql -h SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com -p 16751 -U vultradmin defaultdb

    输出:

    Password for user vultradmin:
  4. 输入托管 PostgreSQL 数据库集群的密码,然后按继续。ENTER输出:
    defaultdb=>
  5. 发出以下 SQL 命令以创建示例数据库。bank_db
    defaultdb=> CREATE DATABASE bank_db;

    输出:

    CREATE DATABASE
  6. 连接到新数据库。bank_db
    defaultdb=> \c bank_db;

    输出:

    ...
    
    You are now connected to database "bank_db" as user "vultradmin".
  7. 创建示例表。此表存储 、 和 。该关键字指示 PostgreSQL 服务器自动生成新的。customerscustomer_idsfirst_nameslast_namesSERIALcustomer_ids
    bank_db=> CREATE TABLE customers (
    
                  customer_id SERIAL PRIMARY KEY,
    
                  first_name VARCHAR(50),
    
                  last_name VARCHAR(50)        
    
              );

    输出:

    CREATE TABLE
  8. 创建表。此表存储客户持有的贷款帐户余额。此表中的列链接回表中的同一列。loanscustomer_idcustomers
    bank_db=> CREATE TABLE loans (
    
                  loan_id SERIAL PRIMARY KEY,
    
                  customer_id BIGINT,
    
                  amount DECIMAL(17, 4)  
    
              );

    输出:

    CREATE TABLE
  9. 从托管的 PostgreSQL 数据库集群注销。
    bank_db=> \q
  10. 按照下一步创建一个数据库类来访问您的示例 PostgreSQL 数据库。

四、 创建自定义的 PostgreSQL 数据库类

设置示例数据库后,现在需要一个连接到数据库的中心类来存储表中的数据。按照以下步骤创建类:

  1. 首先创建一个新目录,将源代码与系统文件分开。project
    $ mkdir project
  2. 导航到新目录。project
    $ cd project
  3. 在文本编辑器上打开一个新文件。postgresql_db.py
    $ nano postgresql_db.py
  4. 在文件中输入以下信息。请记住将数据库凭据(、、和)替换为 PostgreSQL 数据库集群的正确值。postgresql_db.pydb_hostdb_userdb_passdb_port
    import psycopg2
    
    
    
    class PostgresqlDb:
    
    
    
        def __init__(self):
    
    
    
            db_host = 'SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com' 
    
            db_name = 'bank_db'  
    
            db_user = 'vultradmin'
    
            db_pass = 'EXAMPLE_POSTGRESQL_PASSWORD'   
    
            db_port = 16751
    
    
    
            self.db_conn = psycopg2.connect(host = db_host, database = db_name, user = db_user, password = db_pass, port = db_port)
    
    
    
        def execute_db(self, json_payload):
    
    
    
            try: 
    
    
    
                print("Starting new database transaction...")
    
    
    
                self.db_conn.autocommit = False
    
    
    
                self.cur = self.db_conn.cursor()   
    
    
    
                print("Inserting new customer to database...")                 
    
    
    
                sql_string = 'insert into customers(first_name, last_name) values(%s, %s) RETURNING customer_id'     
    
                self.cur.execute(sql_string, (json_payload['first_name'], json_payload['last_name']))
    
                customer_id = self.cur.fetchone()[0]
    
    
    
                print("Customer successfully inserted to database, new customer_id is " + str(customer_id))
    
    
    
                print("Inserting customer's loan record...")    
    
    
    
                sql_string = 'insert into loans(customer_id, amount) values(%s, %s) RETURNING loan_id'     
    
                self.cur.execute(sql_string, (customer_id, json_payload['loan_amount']))
    
                loan_id = self.cur.fetchone()[0]
    
    
    
                print("Customer loan record inserted successfully, new loan_id is " + str(loan_id))
    
    
    
                self.db_conn.commit()
    
    
    
                print("Database transaction completed successfully.")    
    
    
    
                return "Success"
    
    
    
            except (Exception, psycopg2.DatabaseError) as error:
    
    
    
                print("Database transaction failed, rolling back database changes...")
    
    
    
                self.db_conn.rollback()
    
    
    
                return str(error)
    
    
    
            finally:
    
    
    
                if self.db_conn:
    
    
    
                    self.cur.close()
    
                    self.db_conn.close()
    
                    print("Database connection closed successfully.")
  5. 保存并关闭文件。postgresql_db.py

postgresql_db.py文件解释道:

  1. 该语句从 Python 代码加载连接 PostgreSQL 数据库集群的适配器。import psycopg2psycopg2
  2. 该文件包含一个具有两个方法的类。postgresql_db.pyPostgresqlDb
    import psycopg2
    
    
    
    class PostgresqlDb:
    
    
    
        def __init__(self):
    
    
    
            ...
    
    
    
        def execute_db(self, json_payload):
    
    
    
            ...
  3. 该方法是一个构造函数,每次从类创建新对象时都会触发。__init__(...)PostgresqlDb
  4. 该方法从包含客户名称和贷款余额的 HTTP 方法获取 JSON 有效负载,并将请求转发到 PostgreSQL 数据库。execute_db(self, json_payload)POST
  5. 在该方法下,您将 PostgreSQL 参数设置为 。此指令允许您使用命令永久提交成功的事务或命令来阻止部分事务。execute_db(...)autocommitFalsecommit()rollback()
            ...
    
            try: 
    
    
    
                print("Starting new database transaction...")
    
    
    
                self.db_conn.autocommit = False
    
    
    
                self.cur = self.db_conn.cursor()  
    
            ...
  6. 仅当数据库事务中没有错误时,才会触发以下代码块。在事务下,应用程序在表中创建新记录,在表中创建另一条记录。customersloans
                ...
    
    
    
                print("Inserting new customer to database...")                 
    
    
    
                sql_string = 'insert into customers(first_name, last_name) values(%s, %s) RETURNING customer_id'     
    
                self.cur.execute(sql_string, (json_payload['first_name'], json_payload['last_name']))
    
                customer_id = self.cur.fetchone()[0]
    
    
    
                print("Customer successfully inserted to database, new customer_id is " + str(customer_id))
    
    
    
                print("Inserting customer's loan record...")    
    
    
    
                sql_string = 'insert into loans(customer_id, amount) values(%s, %s) RETURNING loan_id'     
    
                self.cur.execute(sql_string, (customer_id, json_payload['loan_amount']))
    
                loan_id = self.cur.fetchone()[0]
    
    
    
                print("Customer loan record inserted successfully, new loan_id is " + str(loan_id))
    
    
    
                self.db_conn.commit()
    
    
    
                print("Database transaction completed successfully.")    
    
    
    
                return "Success"
    
                ...
  7. 当交易失败并出现异常时,该块将触发。然后,块在每种情况下执行以关闭游标和数据库连接。except(...)finally
            ...
    
            except (Exception, psycopg2.DatabaseError) as error:
    
    
    
                print("Database transaction failed, rolling back database changes...")
    
    
    
                self.db_conn.rollback()
    
    
    
                return str(error)
    
    
    
            finally:
    
    
    
                if self.db_conn:
    
    
    
                    self.cur.close()
    
                    self.db_conn.close()
    
                    print("Database connection closed successfully.")

该课程现已准备就绪。使用以下语法将其包含在其他 Python 源代码文件中。PostgresqlDb

import postgresql_db

pg = postgresql_db.PostgresqlDb() 



resp = pg.execute_db(...)

按照下一步为 Python 应用程序创建文件。main.py

五、创建应用程序的入口点

要完成此示例应用程序,您需要一个接受端口 上的传入请求的 HTTP 服务器。Python 有一些内置库,可用于执行任务。按照以下步骤创建 HTTP 服务器:POST8080

  1. 在文本编辑器上打开一个新文件。main.py
     $ nano main.py
  2. 在文件中输入以下信息。main.py
    import http.server
    
    from http import HTTPStatus        
    
    import socketserver
    
    
    
    import json 
    
    import postgresql_db
    
    
    
    class httpHandler(http.server.SimpleHTTPRequestHandler):
    
    
    
                def do_POST(self):
    
    
    
                    content_length = int(self.headers['Content-Length'])
    
                    post_data = self.rfile.read(content_length)
    
                    json_payload = json.loads(post_data)  
    
    
    
                    self.send_response(HTTPStatus.OK)
    
                    self.send_header('Content-type', 'application/json')
    
                    self.end_headers()
    
    
    
                    pg = postgresql_db.PostgresqlDb() 
    
    
    
                    resp = pg.execute_db(json_payload)
    
    
    
                    self.wfile.write(bytes( resp + '\r\n', "utf8")) 
    
    
    
    httpServer = socketserver.TCPServer(('', 8080), httpHandler)
    
    
    
    print("Web server started at port 8080")
    
    
    
    try:
    
    
    
        httpServer.serve_forever()
    
    
    
    except KeyboardInterrupt:
    
    
    
        httpServer.server_close()
    
        print("The HTTP server is stopped.")
  3. 保存并关闭文件。main.py

main.py 文件解释说:

  1. 本节加载示例应用程序所需的所有 Python 库。、 和库加载 HTTP 功能。该模块允许您在加载自定义 PostgreSQL 数据库类时使用 JSON 数据。importhttp.serverHTTPStatussocketserverjsonpostgresql_db
    import http.server
    
    from http import HTTPStatus        
    
    import socketserver
    
    
    
    import json 
    
    
    
    import postgresql_db
    
    ...
  2. 是 HTTP 服务器的处理程序类。此类接受来自 HTTP 客户端的 JSON 有效负载。然后在此类下,and 语句调用自定义类以将数据保存到数据库,并使用该语句返回响应。httpHandlerpg = postgresql_db.PostgresqlDb()pg.execute_db(json_payload)PostgresqlDbself.wfile.write(bytes( resp + '\r\n', "utf8"))
    ...
    
    class httpHandler(http.server.SimpleHTTPRequestHandler):
    
    
    
                def do_POST(self):
    
    
    
                    content_length = int(self.headers['Content-Length'])
    
                    post_data = self.rfile.read(content_length)
    
                    json_payload = json.loads(post_data)  
    
    
    
                    self.send_response(HTTPStatus.OK)
    
                    self.send_header('Content-type', 'application/json')
    
                    self.end_headers()
    
    
    
                    pg = postgresql_db.PostgresqlDb() 
    
    
    
                    resp = pg.execute_db(json_payload)
    
    
    
                    self.wfile.write(bytes( resp + '\r\n', "utf8")) 
    
    ...
  3. 文件末尾的以下声明创建一个 Web 服务器,该服务器侦听 HTTP 请求并将请求调度到类。httpHandler
    ...
    
    httpServer = socketserver.TCPServer(('', 8080), httpHandler)
    
    
    
    print("Web server started at port 8080")
    
    
    
    try:
    
    
    
        httpServer.serve_forever()
    
    
    
    except KeyboardInterrupt:
    
    
    
        httpServer.server_close()
    
        print("The HTTP server is stopped.")

现在,您拥有应用程序所需的所有必要的源代码文件。继续执行下一步以测试应用程序。

六、 测试应用程序

对所有 Python 文件进行编码后,最后一步是安装 Python 包、下载库并测试应用程序。请按照以下步骤完成申请:pippsycopg2

  1. 安装 Python 包。pip
    $ sudo apt install -y python3-pip
  2. 使用该软件包为 PostgreSQL 服务器安装库。pippsycopg2-binary
    $ pip install psycopg2-binary

    输出:

    ...
    
    Installing collected packages: psycopg2-binary
    
    Successfully installed psycopg2-binary-2.9.5
  3. 使用该命令运行应用程序。python3
    $ python3 main.py

    输出:

    Web server started at port 8080
  4. 建立与服务器的另一个连接,并运行以下 Linux 命令,将示例 JSON 有效负载发送到应用程序。SSHcurl
        $  curl -X POST http://localhost:8080/ -H 'Content-Type: application/json' -d '{"first_name": "JOHN", "last_name": "DOE", "loan_amount": "4560"}'

    输出:

    "Success"
  5. 从运行 Web 服务器的第一个终端窗口查看下面的输出。事务成功,没有任何错误。
    Web server started at port 8080
    
    
    
    
    
    Starting new database transaction...
    
    Inserting new customer to database...
    
    Customer successfully inserted to database, new customer_id is 1
    
    Inserting customer's loan record...
    
    Customer loan record inserted successfully, new loan_id is 1
    
    Database transaction completed successfully.
    
    Database connection closed successfully.
  6. 尝试发送以下具有错误贷款金额的无效交易。而不是数值。PP
    $  curl -X POST http://localhost:8080/ -H 'Content-Type: application/json' -d '{"first_name": "JOHN", "last_name": "DOE", "loan_amount": "PP"}'

    输出:

    "invalid input syntax for type numeric: \"PP\"..."
  7. 检查第一个终端窗口的输出。这一次,事务将失败,而不对数据库进行任何更改。尽管应用程序将客户的详细信息插入数据库并获取新的 (),但整个事务将根据以下输出回滚。customer_id2
    ..
    
    Starting new database transaction...
    
    Inserting new customer to database...
    
    Customer successfully inserted to database, new customer_id is 2
    
    Inserting customer's loan record...
    
    Database transaction failed, rolling back database changes...
    
    Database connection closed successfully.
  8. 要验证更改,请登录到 PostgreSQL 数据库集群。
    $ psql -h SAMPLE_POSTGRESQL_DB_HOST_STRING.vultrdb.com -p 16751 -U vultradmin defaultdb

    输出:

    Password for user vultradmin:
  9. 输入您的密码,然后按继续。ENTER输出:
    defaultdb=>
  10. 切换到数据库。bank_db
    defaultdb=> \c bank_db;

    输出:

    You are now connected to database "bank_db" as user "vultradmin".
  11. 查询表。customers
    defaultdb=> SELECT
    
                    customer_id,
    
                    first_name,
    
                    last_name
    
                FROM customers;

    输出:

     customer_id | first_name | last_name
    
    -------------+------------+-----------
    
               1 | JOHN       | DOE
    
    (1 row)
  12. 查询表。loans
    defaultdb=> SELECT
    
                    loan_id,
    
                    customer_id,
    
                    amount
    
                FROM loans;

    输出:

     loan_id | customer_id |  amount
    
    ---------+-------------+-----------
    
           1 |           1 | 4560.0000
    
    (1 row)

上述输出确认应用程序的逻辑按预期工作。如果没有 PostgreSQL 事务逻辑,您现在应该有一个孤立的客户记录,而没有匹配的贷款记录。

七、结论

本指南向您展示如何在 Ubuntu 20.04 服务器上使用 Python 实现 PostgreSQL 数据库事务。使用本指南中的源代码创建将数据库工作单元视为一个整体的应用程序。事务可确保数据库一致性并防止可能出现的孤立记录情况。

如何在 TensorFlow 中使用 BERT 问答与 Python

一、介绍

Tensorflow 是一个开源的端到端平台,用于构建机器学习驱动的应用程序。它包含一组丰富的工具和库,用于各种任务,但主要侧重于神经网络的训练和推理。Tensorflow用于多种应用,包括:

  • 搜索引擎 – 用于部署深度神经网络进行搜索排名,如谷歌的RankBrain。
  • 汽车 – 构建专为自动驾驶设计的神经网络。
  • 教育 – 设计模型来过滤课堂上的有毒聊天消息。
  • Medicare – 用于构建神经网络以从患者数据中检测潜在的健康并发症。
  • 文本摘要和情感分析。

来自变压器的双向编码器表示 (BERT) 是一种自然语言模型,它使用基于转换器的机器学习技术来完成多种常见语言功能,包括:

  • 情感分析 – 情感推断,例如确定电影评论的极性。
  • 自然语言推理 – 确定假设是否在逻辑上遵循前提。
  • 命名实体识别 – 将非结构化文本中的信息提取到预定义的类别中,例如人员名称、位置和数量。
  • 问答 – 构建对话问题和回答系统,如聊天机器人。
  • 文本生成 – 生成与人类编写的文本无法区分的文本。
  • 文本联想查询 – 预测性文字建议,例如撰写电子邮件时 Gmail 的暗示性文字。
  • 文本摘要。

BERT经过了来自维基百科的25亿个单词和来自谷歌书语料库的8亿个单词的预训练。

本指南介绍了如何使用Python在TensorFlow中实现BERT的问答。

二、准备工作

  • Python 的工作知识。
  • 正确安装和配置的 python 工具链,包括 pip(Python 版本 >= 3.7)。

三、设置项目虚拟环境

为应用程序创建隔离的虚拟环境:

  1. 安装python软件包:virtualenv
    $ pip install virtualenv
  2. 创建项目目录:
    $ mkdir bert_QA
  3. 导航到新目录:
    $ cd bert_QA
  4. 创建虚拟环境:
    $ python3 -m venv env

    这将创建一个名为包含脚本的新文件夹,以控制虚拟环境,包括程序库。env

  5. 激活虚拟环境:
    $ source env/bin/activate

 四、安装 TensorFlow

要安装 TensorFlow,请输入以下命令:

$ pip install tensorflow

 五、Tflite 模型制造商

Tflite 模型制作器是一个库,可简化使用自定义数据集训练 Tensorflow Lite 模型的过程。它使用迁移学习来减少所需的训练数据和时间。

tflite 模型制作器库降低了在部署设备上 ML 应用程序时将 Tensorflow 神经网络模型转换为特定输入数据的复杂性。这些型号可以进行微调,以便在内存和 CPU 受限的设备(如智能手机)上运行,而不会牺牲在这些低功耗设备上运行时的精度。

本指南使用 tflite 模型制作器库来微调 BERT 模型以进行问答。

 六、安装Tflite模型制作工具

要安装 tflite 模型制作工具库,请执行以下操作:

  1. 克隆存储库:
    git clone https://github.com/tensorflow/examples
  2. 安装要求:
    pip install -r examples/tensorflow_examples/lite/model_maker/requirements.txt
  3. 安装软件包:
    pip install -e examples/tensorflow_examples/lite/model_maker/pip_package/

 七、构建精简模型

要创建负责问答的微调精简模型,请在工作目录中创建 afile:lite_model_gen.py

touch lite_model_gen.py

 7.1、导入库

通过将以下行添加到lite_model_gen.py文件来导入所需的库:

import tensorflow as tf

from tflite_model_maker import model_spec

from tflite_model_maker import question_answer

from tflite_model_maker.question_answer import DataLoader

从 tflit 模型制作者库中导入的类具有以下功能:

  • model_spec:用于选择表示模型的模型规范。
  • question_answer:用于数据加载和问答的模型训练。
  • DataLoader:提供用于在模型重新训练期间加载自定义数据的通用实用程序。

 7.2、选择型号规格

tflite模型制作器库支持BERT-Base和MobileBERT模型进行问答:

  • BERT-Base- 这是广泛用于NLP任务的标准BERT模型。
  • MobileBERT– 是 BERT-Base 的紧凑型版本,体积小约 4 倍,速度快近 6 倍。MobileBERT尽管比BERT-Base小,但仍能实现有竞争力的结果,并且更适合智能手机等功率受限的设备中的设备端用例。
  • MobileBERT-SQuAD- 该模型使用与MobileBERT相同的架构,但初始模型已经在斯坦福问答数据集(SQuAD)1.1上重新训练。SQuAD是一个阅读理解数据集,由众包工作者在一组维基百科文章中提出的问答对组成。SQuAD 1.1 包含 100,000+ 篇文章的 500+ 问答对。

要使用 MobileBERT-SQuAD 规范,请添加以下行:

# Model specification representing the model

spec = model_spec.get('mobilebert_qa_squad')

model_spec有一个方法 -,该方法将模型规范的名称作为参数。对于使用 BERT 的问答任务,它可以采用三个字符串中的任何一个作为参数:get

  • mobilebert_qa_squad:指定 MobileBERT-SQuAD。
  • mobilebert_qa:指定移动伯特。
  • bert_qa:指定 BERT-Base。

 7.3、获取训练数据

本指南使用 TriviaQA 数据集进行模型训练。TriviaQA 是一个用于阅读理解和问答的大规模数据集,包含超过650k个问答证据三元组。

若要加载数据集进行训练,应使用 TriviaQA 转换 python 脚本将其转换为 SQuAD1.1 格式。本指南将使用预转换的数据集进行训练和验证。

要下载转换后的数据集,请添加以下行:

# Download archived version of already converted datasets

train_data_path = tf.keras.utils.get_file(

      fname='triviaqa-web-train-8000.json',

      origin='https://storage.googleapis.com/download.tensorflow.org/models/tflite/dataset/triviaqa-web-train-8000.json')



validation_data_path = tf.keras.utils.get_file(

      fname='triviaqa-verified-web-dev.json',

      origin='https://storage.googleapis.com/download.tensorflow.org/models/tflite/dataset/triviaqa-verified-web-dev.json')

该函数用于从给定 URL 下载文件(如果缓存中尚不存在)。fname指定文件名,指定文件的原始 URL。这将下载数据集并返回下载文件的路径。tf.keras.utils.get_file

 7.4、加载数据集

通过添加以下行加载数据集:

# Fetch the training and validation data

train_data = DataLoader.from_squad(train_data_path, spec, is_training=True)

validation_data = DataLoader.from_squad(validation_data_path, spec, is_training=False)

DataLoader.from_squad以 SQuAD 格式加载传递的数据集,并根据给定model_spec对文本进行预处理。此方法将文件名和model_spec作为参数,可选参数包括 – 表示数据是否用于训练的布尔值。is_training

 7.5、创建模型

要创建 Tflite 模型,请执行以下操作:

# Create the model

model = question_answer.create(train_data, model_spec=spec)

类方法加载数据并训练模型以进行问答。它需要两个参数 – 训练数据和模型的规范。它还需要可选参数:question_answer.create

  • batch_size=None:用于训练的批量大小。
  • epochs=2:训练的周期数。
  • steps_per_epoch=None:在宣布一个纪元完成并开始下一个纪元之前,批次的样本。它默认为运行,直到输入数据集用尽。
  • shuffle=False:采用布尔值来确定是否应随机播放输入数据。

此方法返回问答的模型实例。

 7.6、评估模型

要使用验证数据集评估模型,请执行以下操作:

# Evaluate the model

model.evaluate(validation_data)

在模型对象上调用该方法会返回指标字典,包括分数和。evaluatef1exact_match

7.7、导出模型

要导出 tflite 模型以用于设备上的问答,请执行以下操作:

# Export the model

model.export(export_dir='.')

这会以默认格式将模型导出到当前工作目录。其他导出格式包括和。TFLITEVOCABSAVED_MODEL

7.8、模型生成代码

最终代码:lite_model_gen.py

import tensorflow as tf

from tflite_model_maker import model_spec

from tflite_model_maker import question_answer

from tflite_model_maker.question_answer import DataLoader



# Model specification representing the model

spec = model_spec.get('mobilebert_qa_squad')



# Download archived version of already converted datasets

train_data_path = tf.keras.utils.get_file(

fname='triviaqa-web-train-8000.json', origin='https://storage.googleapis.com/download.tensorflow.org/models/tflite/dataset/triviaqa-web-train-8000.json')



validation_data_path = tf.keras.utils.get_file(

fname='triviaqa-verified-web-dev.json',

origin='https://storage.googleapis.com/download.tensorflow.org/models/tflite/dataset/triviaqa-verified-web-dev.json')



# Fetch the training and validation data

train_data = DataLoader.from_squad(train_data_path, spec, is_training=True)

validation_data = DataLoader.from_squad(validation_data_path, spec, is_training=False)



# Create the model

model = question_answer.create(train_data, model_spec=spec)



# Evaluate the model

model.evaluate(validation_data)



# Export the model

model.export(export_dir='.')

 八、运行代码

运行代码:

$ python3 lite_model_gen.py

注意:代码运行时介于一小时到几小时之间,具体取决于 CPU 性能或是否存在 GPU。

输出如下所示:

…

….

Downloading data from https://storage.googleapis.com/download.tensorflow.org/models/tflite/dataset/triviaqa-web-train-8000.json

32571392/32570663 [==============================] - 0s 0us/step

32579584/32570663 [==============================] - 0s 0us/step

Downloading data from https://storage.googleapis.com/download.tensorflow.org/models/tflite/dataset/triviaqa-verified-web-dev.json

1171456/1167744 [==============================] - 0s 0us/step

1179648/1167744 [==============================] - 0s 0us/step



INFO:tensorflow:Retraining the models...

INFO:tensorflow:Retraining the models...

Epoch 1/2

1067/1067 [==============================] - 70867s 66s/step - loss: 1.1337 - start_positions_loss: 1.1310 - end_positions_loss: 1.1363

Epoch 2/2

1067/1067 [==============================] - 70983s 67s/step - loss: 0.7942 - start_positions_loss: 0.7934 - end_positions_loss: 0.7949



INFO:tensorflow:Made predictions for 200 records.

INFO:tensorflow:Made predictions for 200 records.

…

…

{'exact_match': 0.5986394557823129, 'final_f1': 0.6728435963129841}

…

…

INFO:tensorflow:TensorFlow Lite model exported successfully: ./model.tflite

成功运行后,精简模型将导出到当前项目目录。

 九、建筑问答

为了集成导出的模型以进行问答,本指南使用 TFlite 支持工具包。该工具包附带强大的库,可将TFLite模型集成到不同的平台上。

要安装:

$ pip install tflite-support

现在,创建问答脚本:

$ touch question_answer.py

9.1、导入库

导入类:BertQuestionAnswerer

from tflite_support.task.text import BertQuestionAnswerer

BertQuestionAnswerer类对文本执行问答。

9.2、创建 BertQuestionAnswerer 对象

从导出的精简模型创建BertQuestionAnswerer对象:

# Create the BertQuestionAnswerer object from a TensorFlow lite model

question_answerer = BertQuestionAnswerer.create_from_file("./model.tflite")

这将返回从模型文件创建的BertQuestionAnswerer对象。

9.3、创建问答上下文

问答需要一个上下文。这个上下文可以是提出问题的段落或句子。以下关于亚历山大大帝的文字来自维基百科。这将用作问答的上下文:

# Create context for question answering

context = "Alexander the Great was a king of the ancient Greek kingdom of Macedon. He succeeded his father Philip II to the throne in 336 BC at the age of 20, and spent most of his ruling years conducting a lengthy military campaign throughout Western Asia and Egypt. By the age of thirty, he had created one of the largest empires in history, stretching from Greece to northwestern India. He was undefeated in battle and is widely considered to be one of history's greatest and most successful military commanders. Until the age of 16, Alexander was tutored by Aristotle. In 335 BC, shortly after his assumption of kingship over Macedon, he campaigned in the Balkans and reasserted control over Thrace and Illyria before marching on the city of Thebes, which was subsequently destroyed in battle. Alexander then led the League of Corinth, and used his authority to launch the pan-Hellenic project envisaged by his father, assuming leadership over all Greeks in their conquest of Persia."

9.4、创建问题

根据上述上下文创建一个字典来保存问答对:

# Create questions

questions = {

    "Who is Alexander the Great": None,

    "Until the age of 16, who tutored Alexander": None,

}

9.5、回答问题

要回答字典中的问题:

# Answer questions

for question in questions.keys():

   answer = question_answerer.answer(context, question)

   questions[question] = answer



print(questions)

该方法有两个参数,一个上下文和一个问题。它根据上下文回答问题并返回 a.QuestionAnswererResult 是由 BertQuestionAnswerer生成的可能答案列表。answerQuestionAnswererResult

十、最终问答代码

完整的 question_answer.py 代码:

from tflite_support.task.text import BertQuestionAnswerer



# Create the BertQuestionAnswerer object from a TensorFlow lite model

question_answerer = BertQuestionAnswerer.create_from_file("./model.tflite")



# Create context for question answering

context = "Alexander the Great was a king of the ancient Greek kingdom of Macedon. He succeeded his father Philip II to the throne in 336 BC at the age of 20, and spent most of his ruling years conducting a lengthy military campaign throughout Western Asia and Egypt. By the age of thirty, he had created one of the largest empires in history, stretching from Greece to northwestern India. He was undefeated in battle and is widely considered to be one of history's greatest and most successful military commanders. Until the age of 16, Alexander was tutored by Aristotle. In 335 BC, shortly after his assumption of kingship over Macedon, he campaigned in the Balkans and reasserted control over Thrace and Illyria before marching on the city of Thebes, which was subsequently destroyed in battle. Alexander then led the League of Corinth, and used his authority to launch the pan-Hellenic project envisaged by his father, assuming leadership over all Greeks in their conquest of Persia."



# Create questions

questions = {

    "Who is Alexander the Great": None,

    "Until the age of 16, who tutored Alexander": None,

}



# Answer questions

for question in questions.keys():

   answer = question_answerer.answer(context, question)

   questions[question] = answer



print(questions)

 十一、运行代码

运行上面的代码:

$ python3 question_answer.py

这将产生以下结果:

{

  'Who is Alexander the Great': QuestionAnswererResult(answers=[

      QaAnswer(pos=Pos(start=12, end=20, logit=-1.621170163154602), text='king of the ancient Greek kingdom of Macedon.'), 

      QaAnswer(pos=Pos(start=12, end=27, logit=-2.1207242012023926), text='king of the ancient Greek kingdom of Macedon. He succeeded his father Philip II'),

      QaAnswer(pos=Pos(start=19, end=20, logit=-3.1698760986328125), text='Macedon.'), 

      QaAnswer(pos=Pos(start=26, end=27, logit=-3.3418025970458984), text='Philip II'),

      QaAnswer(pos=Pos(start=12, end=12, logit=-3.3852314949035645), text='king')]), 



  'Until the age of 16, who tutored Alexander': QuestionAnswererResult(answers=[

      QaAnswer(pos=Pos(start=121, end=121, logit=7.933090686798096), text='Aristotle.'), 

      QaAnswer(pos=Pos(start=118, end=121, logit=1.3499608039855957), text='tutored by Aristotle.'), 

      QaAnswer(pos=Pos(start=121, end=122, logit=1.0493016242980957), text='Aristotle.'), 

      QaAnswer(pos=Pos(start=110, end=121, logit=0.37497782707214355), text='Until the age of 16, Alexander was tutored by Aristotle.'), 

      QaAnswer(pos=Pos(start=118, end=119, logit=-5.260964870452881), text='tutored')])

}

返回的QuestionAnswererResult对象包含一个列表,这些列表表示所提出问题的可能答案。标记答案在上下文中的相对位置,同时表示答案文本。QaAnswerpostext

在第一个问题中,返回了五个可能的答案,其中两个是正确的,而在第二个问题中 – 5 个可能答案中有 4 个是正确的。

十二、结论

本指南介绍了如何在TensorFlow中使用BERT,方法是构建一个精简的问答模型,并使用Tflite支持库在上下文中回答问题。

教程:在 Python 中实现购物车,使用 Vultr 托管数据库进行 Redis

一、介绍

Redis 服务器是使用最广泛的开源数据库解决方案之一。Redis 将所有数据存储在内存中,以实现低延迟和高吞吐量。这些优势使 Redis 适用于存储繁忙网站的购物车数据,尤其是在旺季。

从电子商务网站下订单时,客户通常会从购物清单中选择商品,然后将商品添加到购物车。在后台,购物车驻留在数据库中。虽然使用关系数据库来处理购物车数据是可行的,但此类数据库可能无法在大量用户的情况下以最佳方式执行,并且它们的缓慢可能会对用户体验产生负面影响。

Redis 具有多种功能,您可以使用它们来加快在网站上添加、删除和显示购物车数据的过程。您可以将 Redis,,, 和命令与 Redis 哈希(字段值对的集合)结合使用,以实现所有购物车功能。hsethincrbyhdelhget

本指南向您展示如何在 Ubuntu 20.04 服务器上使用 Python 和托管 Redis 数据库实现购物车应用程序。

二、准备工作

要遵循本指南:

  • 部署 Ubuntu 20.04 服务器。
  • 创建一个非根sudo用户。
  • 配置托管的 Vultr Redis 数据库。然后,导航到您的 Redis 数据库实例并单击概述选项卡以检索您的数据库连接详细信息。本指南使用以下示例连接详细信息:
    • 用户名default
    • 密码EXAMPLE_PASSWORD
    • 主持人SAMPLE_DB_HOST_STRING.vultrdb.com
    • 端口16752

三、创建中央 Redis 类

在设计 Python 应用程序时,通常为基本函数设置单独的类,以促进代码重用并减少重复。本指南使用一个中央 Redis 类,该类处理从 Redis 服务器添加、删除、删除和列出购物车项目的不同功能。按照以下步骤创建类:

  1. 首先创建一个目录,将你的 Python 源代码与其他 Linux 文件分开。project
    $ mkdir project
  2. 切换到新目录。project
    $ cd project
  3. 在文本编辑器中打开一个新文件。redis_gateway.py
    $ nano redis_gateway.py
  4. 在文件中输入以下信息。将 、 和值替换为托管 Redis 数据库中的正确数据库信息。redis_gateway.pydb_hostdb_portdb_pass
    import redis
    
    
    
    class RedisGateway:
    
    
    
        def __init__(self):
    
    
    
            db_host = 'SAMPLE_DB_HOST_STRING.vultrdb.com'
    
            db_port = 16752
    
            db_pass = 'EXAMPLE_PASSWORD'   
    
    
    
            self.redisClient = redis.Redis(host = db_host, port = db_port, password = db_pass, ssl = 'true')
    
    
    
        def add_to_cart(self, json_payload):
    
    
    
            cart_id = json_payload["cart_id"]
    
            product_name = json_payload["product_name"]
    
            quantity = json_payload["quantity"]
    
    
    
            if self.redisClient.hexists(cart_id , product_name):
    
    
    
                self.redisClient.hincrby(cart_id , product_name, quantity)
    
    
    
            else:   
    
    
    
                self.redisClient.hset(cart_id, product_name, quantity)                   
    
    
    
            return "Item added to cart successfully."
    
    
    
        def remove_from_cart(self, json_payload):
    
    
    
            cart_id = json_payload["cart_id"]
    
            product_name = json_payload["product_name"]
    
    
    
    
    
            if self.redisClient.hexists(cart_id, product_name):
    
    
    
                self.redisClient.hdel(cart_id, product_name)
    
    
    
                return "Item removed from cart."
    
    
    
            else:   
    
    
    
                return "Item not found from cart."
    
    
    
        def get_cart(self, cart_id):
    
    
    
            result = self.redisClient.hgetall(cart_id)        
    
    
    
            return result
  5. 保存并关闭文件。redis_gateway.py

3.1、redis_gateway.py文件解释

  • 该文件有一个类。redis_gateway.pyRedisGateway
  • 该类有四种方法。RedisGateway
    1. 该函数在首次导入和初始化类时触发。__init__()
    2. 该函数包含您要添加到购物车的、该和该。在函数下,您首先通过运行函数来检查 Redis 服务器中是否存在购物车。如果购物车存在,则使用命令将新数量添加到购物车。否则,如果具有定义的购物车不存在,则从头开始创建它。然后,您将使用该命令添加新的产品名称和数量。add_to_cart(..., json_payload)json_payloadcart_idproduct_namequantityadd_to_cart()self.redisClient.hexistsself.redisClient.hincrby...cart_idself.redisClient.hset(cart_id, product_name, quantity)
    3. 该函数接受具有要从购物车中删除的产品名称的 JSON 有效负载。在发出命令之前,您使用逻辑语句检查产品是否存在于 Redis 服务器中。remove_from_cart(self, json_payload)if self.redisClient.hexists(cart_id, product_name):self.redisClient.hdel(cart_id, product_name)
    4. 该函数采用并查询 Redis 服务器以使用命令列出购物车中的所有项目。get_cart(..., cart_id)cart_idself.redisClient.hgetall(cart_id)
  • 现在准备好了。您可以在其他源代码文件中引用它,并使用以下语法使用其方法:RedisGateway
    import redis_gateway  
    
    rg = redis_gateway.RedisGateway()
    
    
    
    resp = rg.add_to_cart(json_payload);
    
    resp = rg.remove_from_cart(json_payload);
    
    resp = rg.get_cart(cart_id);

准备好中央 Redis 网关后,按照下一步为您的应用程序创建主文件。

四、 创建主 Python 文件

每个 Python 应用程序都必须有一个在启动应用程序时触发的主文件。本指南使用文件作为起点。按照以下步骤创建文件:main.py

  1. 在文本编辑器中打开一个新文件。main.py
    $ nano main.py
  2. 在文件中输入以下信息。main.py
    import http.server
    
    from http import HTTPStatus
    
    from urllib.parse import urlparse, parse_qs
    
    
    
    import socketserver
    
    import json
    
    
    
    import redis_gateway        
    
    
    
    class httpHandler(http.server.SimpleHTTPRequestHandler):
    
    
    
        def do_POST(self):
    
    
    
            content_length = int(self.headers['Content-Length'])
    
            post_data = self.rfile.read(content_length)
    
    
    
            json_payload = json.loads(post_data) 
    
    
    
            self.send_response(HTTPStatus.OK)
    
            self.end_headers()
    
    
    
            rg = redis_gateway.RedisGateway()
    
    
    
            resp = rg.add_to_cart(json_payload);                
    
    
    
            self.wfile.write(bytes(resp + '\r\n', "utf8"))
    
    
    
        def do_DELETE(self):
    
    
    
            content_length = int(self.headers['Content-Length'])
    
            post_data = self.rfile.read(content_length)
    
    
    
            json_payload = json.loads(post_data) 
    
    
    
            self.send_response(HTTPStatus.OK)
    
            self.end_headers()
    
    
    
            rg = redis_gateway.RedisGateway()
    
    
    
            resp = rg.remove_from_cart(json_payload);                
    
    
    
            self.wfile.write(bytes(resp + '\r\n', "utf8"))
    
    
    
        def do_GET(self):
    
    
    
            self.send_response(HTTPStatus.OK)
    
            self.end_headers()
    
    
    
            parsed_url = urlparse(self.path)
    
            params = parse_qs(parsed_url.query)
    
    
    
            cart_id = params['cart_id'][0]
    
    
    
            rg = redis_gateway.RedisGateway()  
    
    
    
            cart = rg.get_cart(cart_id)
    
    
    
            data = { y.decode('ascii'): cart.get(y).decode('ascii') for y in cart.keys() }
    
    
    
            resp = json.dumps(data, indent = 4, separators = (',', ': '))    
    
    
    
            self.wfile.write(bytes(resp + "\r\n", "utf8"))
    
    
    
    httpServer = socketserver.TCPServer(('', 8080), httpHandler)
    
    
    
    print("HTTP server started at port 8080...")
    
    
    
    try:
    
    
    
        httpServer.serve_forever()
    
    
    
    except KeyboardInterrupt:
    
    
    
        httpServer.server_close()
    
        print("The server is stopped.")
  3. 保存并关闭文件。main.py

4.1、main.py 文件解释

  • 该部分允许您导入应用程序所需的所有必要的 Python 模块。,,,, 和模块为应用程序提供 HTTP 功能。该模块允许您在标准 JSON 响应中格式化响应。然后,要连接到 Redis 服务器,您需要导入自定义模块。import ...http.serverHTTPStatusurlparseparse_qssocketserverjsonredis_gateway
  • 该类处理所有 HTTP 方法。这些是,,,和。这些方法与以下购物车功能匹配。httpHandlerPOSTDELETEGET
    • POST:.将商品添加到购物车。rg.add_to_cart(...)
    • DELETE:.从购物车中删除商品。rg.remove_from_cart(...)
    • GET:.列出购物车中的所有商品。rg.get_cart(...)
  • 该语句在端口上启动 HTTP 服务器并声明为处理程序类。httpServer = socketserver.TCPServer(('', 8080), httpHandler)8080httpHandler

您的应用程序现已准备就绪。按照下一步测试应用程序。

五、 测试 Redis 购物车应用程序

最后一步是安装应用程序所需的第三方 Python 模块,并运行一些测试以确保一切正常。

  1. 安装 Python 包。这是一个用于下载和安装Python模块的工具。pip
    $ sudo apt update
    
    $ sudo apt install -y python3-pip
  2. 使用软件包来安装 Python 模块。pipredis
    $ pip install redis
  3. 使用命令运行应用程序,然后使用启动文件。python3main.py
    $ python3 main.py

    输出。

    HTTP server started at port 8080...
  4. 与服务器建立另一个 SSH 连接,并使用 Linux命令向应用程序发送以下请求。curl
    • 将三个项目添加到购物车使用。abccart_id
      $ curl -X POST http://localhost:8080/ -H 'Content-Type: application/json' -d '{"cart_id": "abc", "product_name": "SMART WATCH", "quantity": "4"}'
      
      
      
      $ curl -X POST http://localhost:8080/ -H 'Content-Type: application/json' -d '{"cart_id": "abc", "product_name": "4G WIRELESS ROUTER", "quantity": "7"}'
      
      
      
      $ curl -X POST http://localhost:8080/ -H 'Content-Type: application/json' -d '{"cart_id": "abc", "product_name": "500W JUICER", "quantity": "2"}'

      输出。

      ...
      
      Item added to cart successfully.
    • 从购物车中检索商品。
      $ curl -X GET 'http://localhost:8080?cart_id=abc'

      输出。

      {
      
          "SMART WATCH": "4",
      
          "4G WIRELESS ROUTER": "7",
      
          "500W JUICER": "2"
      
      }
    • 从购物车中取出一件商品。
      $ curl -X DELETE http://localhost:8080/ -H 'Content-Type: application/json' -d '{"cart_id": "abc", "product_name": "SMART WATCH"}'

      输出。

      Item removed from cart.

 六、结论

本指南在 Ubuntu 20.04 服务器上使用 Python 和来自 Vultr 的托管 Redis 数据库实现了购物车解决方案。在创建下一个购物车应用程序时,请使用本指南中的示例源代码。Redis 购物车实现允许您的应用程序在高峰期处理许多订单,从而增强用户体验。

教程:如何在 Go、NodeJS、PHP、Python 和 redis-cli 中使用 TLS/SSL 安全连接到 Redis

一、介绍

Vultr 的托管 Redis 数据库需要源代码或 CLI 中的 TLS/SSL 连接。本文包含的代码示例演示了如何使用流行的编程语言安全地使用 TLS/SSL 进行连接。redis-cli

1.1、如何查找您的 TLS/SSL 连接网址

要找到您的 Redis 连接 URL,请导航到托管 Redis 数据库的“概述”部分,然后单击“复制 Redis URL”,这会将其放在剪贴板上。

Connection String

  • URL 由用户名、密码、主机和端口值组合而成。
  • 本文中的所有代码都使用此示例 URL,您应该将其替换为真实的 URL:
    rediss://USERNAME:YOUR_PASSWORD@YOUR_HOST:PORT_NUMBER

重要:请注意协议中的双精度,这表明连接使用 TLS/SSL 加密。ssrediss://

二、连接 Go

首先,获取图书馆。go-redis/redis

$ go get github.com/go-redis/redis/v8

接下来,打开一个名为的文件并粘贴以下内容。将示例 Redis URL 替换为您的 URL。main.go

package main



import (

    "context"

    "fmt"



    "github.com/go-redis/redis/v8"

)



var ctx = context.Background()



func main() {

    redisURL := "rediss://USERNAME:YOUR_PASSWORD@YOUR_HOST:PORT_NUMBER"



    addr, err := redis.ParseURL(redisURL)

    if err != nil {

        panic(err)

    }



    rdb := redis.NewClient(addr)



    err = rdb.Set(ctx, "key", "Hello Vultr!", 0).Err()

    if err != nil {

        panic(err)

    }



    val, err := rdb.Get(ctx, "key").Result()

    if err != nil {

        panic(err)

    }

    fmt.Println("Value is:", val)

}

这将创建一个名为 value 的键。然后,它检索密钥并打印其值。keyHello Vultr!

要运行此示例,请执行以下操作:

$ go run main.go

 三、与 NodeJS 连接

首先,安装库。ioredis

$ npm install --save ioredis

接下来,打开一个名为的文件并粘贴以下内容。将示例 Redis URL 替换为您的 URL。index.php

const Redis = require("ioredis");

const redisUrl = "rediss://USERNAME:YOUR_PASSWORD@YOUR_HOST:PORT_NUMBER"

const redis = new Redis(redisUrl);



redis.set("key", "Hello Vultr!");



redis.get("key").then(function (result) {

    console.log(`Value is: ${result}`);

    redis.disconnect();

});

这将创建一个名为 value 的键。然后,它检索密钥并打印其值。keyHello Vultr!

要运行此示例,请执行以下操作:

$ node index.js

 四、与 PHP 连接

首先,安装库。predis

$ composer require predis/predis

接下来,打开一个名为的文件并粘贴以下内容。将示例 Redis URL 替换为您的 URL。index.php

<?php



require 'vendor/autoload.php';

Predis\Autoloader::register();



$redis_url = 'rediss://USERNAME:YOUR_PASSWORD@YOUR_HOST:PORT_NUMBER';



$client = new Predis\Client($redis_url);



$client->set('key', 'Hello Vultr!');

$value = $client->get('key');



echo "Value is: {$value}";

这将创建一个名为 value 的键。然后,它检索密钥并打印其值。keyHello Vultr!

要运行此示例,请执行以下操作:

$ php index.php

 五、与 Python 连接

首先,安装 redis-py 库。

$ pip install redis

接下来,打开一个名为的文件并粘贴以下内容。将示例 Redis URL 替换为您的 URL。main.py

import redis



def main():

    redis_url = 'rediss://USERNAME:YOUR_PASSWORD@YOUR_HOST:PORT_NUMBER'

    redis_client = redis.from_url(redis_url)



    redis_client.set('key', 'Hello Vultr!')

    key = redis_client.get('key').decode('utf-8')



    print('Value is:', key)



if __name__ == '__main__':

    main()

这将创建一个名为 value 的键。然后,它检索密钥并打印其值。keyHello Vultr!

要运行此示例,请执行以下操作:

$ python main.py

 六、连接redis-cli

首先,您需要安装。redis-cli

  • 选项 1:作为操作系统的Redis 包的一部分进行安装。redis-cli
  • 选项 2:安装NodeJS 独立版本。redis-cli

接下来,从终端窗口执行以下操作。将示例 Redis URL 替换为您的 URL。

$ redis-cli -u rediss://USERNAME:YOUR_PASSWORD@YOUR_HOST:PORT_NUMBER

要检查连接,请运行命令,该命令返回所有 Redis 参数。INFO

INFO