晨曦's Blog

This is a window to the soul

前言

API 为什么需要设计签名验证?为了有一定的数据抓取防御能力。
需要考虑的点:

请求参数是否已被篡改
请求来源是否合法
请求是否具有唯一性

原理

参数签名方式:它要求客户端按照约定好的算法生成签名字符串,作为请求的一部分像接口请求,服务端验算签名即可知是否合法。

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import requests
import hashlib
import time

key = "xxx" # 密钥,由服务端颁发

client = "local" # 客户端
server = "kafka" # 服务端
env = "test" # 环境
timestamp = str(time.time())[:10] # 时间
sign_str = client+server+env+timestamp+key
sign = hashlib.md5(sign_str.encode(encoding='utf-8')).hexdigest() # md5

headers = dict(client=client, server=server, env=env, timestamp=timestamp, sign=sign)

r1 = requests.get("https://www.xxx.com/hello", headers=headers)

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from sanic import Sanic
from sanic.response import text, json
import hashlib

app = Sanic("My Hello, world app")

@app.get("/hello")
async def hello_world(request):

key = "xxx" # 密钥

client = request.headers.get("client")
server = request.headers.get("server")
env = request.headers.get("env")
timestamp = request.headers.get("timestamp")
sign = request.headers.get("sign")

_sign_str = client+server+env+timestamp+key
_sign = hashlib.md5(_sign_str.encode(encoding='utf-8')).hexdigest()

if sign==_sign:
return json(list(request.headers.items()))
else:
return text("验证失败")
app.run(host='0.0.0.0',debug=True,auto_reload=True)

前言

目前 Docker Hub 上普通用户已经不能自动构建了,于是想通过 Github Actions 来实现打包并推送到 Docker Hub

实现

第一步

第一步先在项目下建一个 yml 文件,路径:.github/workflows/push2hub.yml

内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
name: Publish Docker image
on:
push:
branches:
- master
jobs:
push_to_registry:
name: Push Docker image to Docker Hub
runs-on: ubuntu-latest
steps:
- name: Check out the repo
uses: actions/checkout@v2
- name: Log in to Docker Hub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Push to Docker Hub
uses: docker/build-push-action@v2
with:
push: true
tags: jakehu/scripts:latest

上面 Yaml 的功能是,在每一次推送到 Master 分支上的时候,就将项目推送到 Docker Hub 上的 jakehu 用户下的 scripts 仓库里的 latest Tag

第二步

第二步需要在 github 上设置 DOCKER_USERNAMEDOCKER_PASSWORD 两个变量
UMJEbb

第三步

第三步只需要对 Master 分支进行推送即可,然后我们就能在 Actions 里面看到对应的流水线信息
kCVfE8


Github Actions 文档

前言

CentOS 8 今年开始已经停止维护了,难道就没有可以免费使用的 RHEL 了?

答案是否定的

Rocky Linux

Rocky Linux 是由 CentOS 项目的创始人 Gregory Kurtzer 领导构建的,他与原来的 Centos 一样位于 RHEL 下游。

如图所示:

微软、谷歌、亚马逊等公有云都已经接入,希望国内的公有云也能尽快的接入

Rocky Linux 官网

前记

在使用 Sanic 的过程中对 ORM 的选择可谓是痛苦的,用过官方推荐的 Tortoise ORM,也用过 SQLAlchemy

比起使用 ORM 我更喜欢原生 SQL,当我看见 databases 的时候我发现它满足了的我所有要求,支持异步驱动 aiomysql,支持原生 SQL 写法,还封装进了 SQLAlchemy,只要你想你也可以把 databases 当作 SQLAlchemy 使用

安装

安装 databases

1
pip install databases

安装数据库驱动

1
pip install aiomysql

在安装 databases 的时候会自动的安装 SQLAlchemy 目前已经支持 1.4 版本

配置

利用监听器去控制数据库的连接和断开,并将句柄放入到应用上下文中 app.ctx.db

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
./server.py

from sanic import Sanic
from databases import Database

app = Sanic('jakehu')

# 数据库
database = Database('mysql://localhost/example')
# 服务开启之前
@app.listener("before_server_start")
async def startup_db(app, loop):
await database.connect()
# 服务结束之后
@app.listener("after_server_stop")
async def shutdown_db(app, loop):
await database.disconnect()
app.ctx.db = database

@app.get("/")
async def foo_handler(request):
return text("Hello World!")

app.run(host='0.0.0.0', debug=True, auto_reload=True)

使用

最后看如何在函数中使用,利用应用上下文中的句柄进行操作,最后在利用_mapping 属性进行转换

1
2
3
4
5
6
7
@app.get("/")
async def foo_handler(request):
database= request.app.ctx.db
query = "SELECT * FROM table"
rows = await database.fetch_all(query=query)
result = [dict(row._mapping) for row in rows]
return json(result)

关于如何将 sqlalchemy.engine.row.Row 转化为 dict 可以参考

示例

前言

希望能像 npm install 那样自动的将安装的包加入到 requirements.txt 文件中,但是同时又不希望把子依赖加入其中

虽然我们能通过 pip freeze > requirements.txt 将依赖导出,但是这样导出的依赖,会把包的其他子依赖也导出,导致重新安装的时候总是提示包的版本不对

解决

通过 bashalias 或者函数来解决,在.zshrc 中添加以下函数

1
2
3
function pip-install {
pip install $1 && pip freeze | grep -w "${1}=" >> requirements.txt
}

使用

1
pip-install sanic

效果

1
2
cat requirements.txt
sanic==21.6.2

从上面文件可以看出 requirements.txt 中并没有 sanic 的其他子依赖,至于 sanic 的其他子依赖会在安装 sanic 时自动安装就不用管它了

最后

最后我们在其他地方使用项目的时候只需要安装 requirements.txt 中的包就行了

1
pip install -r requirements.txt

前言

Mac 系统中,如果利用 Homebrew 安装 LuaRocks,默认只会安装最新版本的 Lua。鉴于 lapisluajit 都只兼容 lua@5.1 版本,所以就需要自行安装 lua@5.1

兼容

第一步:利用 Homebrew 安装 luarocks

1
brew install luarocks

第二步:利用 Homebrew 安装 lua@5.1

1
brew install lua@5.1

第三步:查看 lua@5.1 的安装目录

1
2
3
brew info lua@5.1

/usr/local/Cellar/lua@5.1/5.1.5_8

第四步:利用参数 --lua-dir 以及 --lua-version 使用 5.1 版本,两个参数可以同时设置,也可以只设置一个

1
2
3
luarocks --lua-dir=/usr/local/Cellar/lua@5.1/5.1.5_8 --lua-version=5.1 install lapis

luarocks --lua-version=5.1 install lapis

通过上面设置就能兼容不同版本的 Lua

错误

安装 luaossl 时出现以下错误:

1
2
3
4
5
6
7
8
Installing https://luarocks.org/luaossl-20200709-0.src.rock

Error: Failed installing dependency: https://luarocks.org/luaossl-20200709-0.src.rock - Could not find header file for CRYPTO
No file openssl/crypto.h in /usr/local/include
No file openssl/crypto.h in /usr/include
No file openssl/crypto.h in /include
You may have to install CRYPTO in your system and/or pass CRYPTO_DIR or CRYPTO_INCDIR to the luarocks command.
Example: luarocks install luaossl CRYPTO_DIR=/usr/local

解决如下:
设置 OPENSSL_DIR 以及 CRYPTO_DIR

1
luarocks --lua-version=5.1 OPENSSL_DIR=/usr/local/Cellar/openssl@1.1/1.1.1k/ CRYPTO_DIR=/usr/local/Cellar/openssl@1.1/1.1.1k/ install lapis

最后

配置 LUA_PATHLUA_CPATH 以及 PATH,在终端中输入

1
luarocks --lua-version=5.1 path --bin

取得 LUA_PATHLUA_CPATH 写入到 ~/.zshrc

1
2
export LUA_PATH=''
export LUA_CPATH=''

最后再将.luarocks/bin 导入 PATH

1
export PATH="$HOME/.luarocks/bin:$PATH"

如果不做上面操作就会出现下面错误

1
lua entry thread aborted: runtime error: content_by_lua(nginx.conf.compiled:22):2: module 'lapis' not found:

Python 框架 Sanic 实现文件上传功能

实现

判断允许上传的类型,同时利用 UUID 生成新的文件名存储到对应的文件夹中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@app.route("/upload", methods=['POST'])
async def upload(request):
allow_type = ['.jpg', '.png', '.gif'] # 允许上传的类型
file = request.files.get('file')
type = os.path.splitext(file.name)

if type[1] not in allow_type:
return json({"code": -1, "msg": "只允许上传.jpg.png.gif类型文件"})

name = str(uuid4())+type[1]
path = "/user/data/web/upload" # 这里注意path是绝对路径

async with aiofiles.open(path+"/"+name, 'wb') as f:
await f.write(file.body)
f.close()
return json({"code": 0, "msg": "上传成功", "data": {
"name": name,
"url": "/upload/"+name
}})

上传

可以利用 Postman 上传测试,需要注意的是 header 头中的 Content-Type:multipart/form-data; 必须设置

访问

需要访问上传过后的文件,这就需要用到 Sanic 静态文件代理

1
2
path = "/user/data/web/upload" # 这里注意path是绝对路径
app.static("/upload", path)

最后访问路径为:

1
https://www.域名.com/upload/uuid.png

前记

从用 HexoNext 开始都不知道换了多少次评论系统了,最开始的多说,后来的 Valine,再后来的 Disqus。换来换去最后还是决定用 utterances

安装

utterances

第一步只需要访问:https://github.com/apps/utterances 进行安装

第二步选择存放 issues 的项目,可以跟 Github pages 放在同一个项目

配置

1
2
3
4
5
utterances:
enable: true
repo: jakehu/jakehu.github.io
issue_term: pathname
theme: github-light

最后只需要在主题配置文件中进行如上配置即可

前记

在设计数据库的时候有一个 IP 字段,是用来存多个 IP 地址。于是设计成了 Json 类型,记一下如何在 Tortoise Orm 中使用 JSON_CONTAINS

使用

Model

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from tortoise import Model, fields
from uuid import uuid4

class ag_ip_group(Model):
id = fields.IntField(pk=True)
uuid = fields.UUIDField(default=uuid4)
name = fields.CharField(64)
ip = fields.JSONField()
remark = fields.CharField(64,default='_')
status = fields.IntField(default=1)
created_time = fields.DatetimeField(null=True, auto_now_add=True)
updated_time = fields.DatetimeField(null=True, auto_now=True)

def __str__(self):
return str(self.id)

Function

1
2
3
4
5
from tortoise.functions import Function
from pypika import CustomFunction

class JsonContains(Function):
database_func = CustomFunction("JSON_CONTAINS", ["field", "value"])

Service

1
2
3
4
5
6
filter = {}
annotate = {}
annotate['json_len'] = JsonContains('ip', '"'+row['ip']+'"')
filter['json_len__gt'] = 0

await self.model.all().annotate(**annotate).filter(**filter).limit(limit).offset(offset).values()

扩展

其他的 Json 函数也可自行扩展,这里我们再扩展一种 Json 模糊查询,即:

1
select * from table where json_extract(field, '$') LIKE '%value%'

实现如下:

1
2
3
4
5
6
7
8
9
class JsonExtract(Function):
database_func = CustomFunction("JSON_EXTRACT", ["field", 'value'])

filter = {}
annotate = {}
annotate['json_str'] = JsonExtract('ip','$')
filter['json_str__icontains'] = row['ip']

await self.model.all().annotate(**annotate).filter(**filter).limit(limit).offset(offset).values()

其他

JSONField 类型字段中使用 encoder 更改 json.dumps() 参数 ensure_ascii=False,实现也非常的简单

1
2
import json
field = fields.JSONField(encoder=lambda x: json.dumps(x,ensure_ascii=False))

锦城虽云乐,不如早还家

前言

需求很简单,通过域名证书的私钥分析证书的 DNS 域名以及有效期

实现

分析后决定用 pyOpenSSL 来实现,具体代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from OpenSSL import crypto
from dateutil import parser

class Ssl():
def parse(self, certificate):
res = {
"domain": []
}
cert = crypto.load_certificate(crypto.FILETYPE_PEM, certificate)
# 有效期
datetime_struct_start = parser.parse(cert.get_notBefore().decode("UTF-8"))
datetime_struct_end = parser.parse(cert.get_notAfter().decode("UTF-8"))
res['start_time'] = datetime_struct_start.strftime('%Y-%m-%d %H:%M:%S')
res['end_time'] = datetime_struct_end.strftime('%Y-%m-%d %H:%M:%S')
# 扩展
count = cert.get_extension_count()
for i in range(count):
crt = cert.get_extension(i)
name = str(crt.get_short_name(), encoding="utf-8")
if name == "subjectAltName":
_list = str(crt).split(',')
_it = iter(_list) # 创建迭代器对象
for x in _it:
res["domain"].append(x.strip().replace("DNS:", ""))
break
return res

if __name__ == '__main__':
_certificate="""证书私钥"""
ssl = Ssl()
print(ssl.parse(_certificate))
# {'domain': ['*.jakehu.me', 'jakehu.me'], 'start_time': '2021-07-12 18:01:16', 'end_time': '2021-10-10 18:01:15'}

最后

通过 get_subject()get_issuer() 获取证书的其他信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
subject = cert.get_subject()
issuer = cert.get_issuer()

print("证书版本:", cert.get_version() + 1)
print("证书序列号:", hex(cert.get_serial_number()))
print("证书中使用的签名算法:", cert.get_signature_algorithm().decode("UTF-8"))
print("颁发者:", issuer.commonName)
datetime_struct = parser.parse(cert.get_notBefore().decode("UTF-8"))
print("有效期从:", datetime_struct.strftime('%Y-%m-%d %H:%M:%S'))
datetime_struct= parser.parse(cert.get_notAfter().decode("UTF-8"))
print("到:", datetime_struct.strftime('%Y-%m-%d %H:%M:%S'))
print("证书是否已经过期:", cert.has_expired())
print("公钥长度", cert.get_pubkey().bits())
print("公钥:\n", crypto.dump_publickey(crypto.FILETYPE_PEM, cert.get_pubkey()).decode("utf-8"))
print("主体信息:")
print("CN:通用名称 OU:机构单元名称")
print("O:机构名 L:地理位置")
print("S:州/省名 C:国名")
for item in issuer.get_components():
print(item[0].decode("utf-8"), " —— ", item[1].decode("utf-8"))
0%