晨曦's Blog

This is a window to the soul

自适应将 Bytes 格式化为可读性更高的单位

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
formatSize(bytes) {
const sizes = ["Bytes", "KB", "MB", "GB", "TB"];
if (bytes == 0) {
return bytes + " " + sizes[0];
}
let flag = "";
if (bytes < 0) {
bytes = Math.abs(bytes);
flag = "-";
}
const i = parseInt(Math.floor(Math.log(bytes) / Math.log(1024)));
if (i == 0) {
return bytes + " " + sizes[i];
}
return flag + (bytes / Math.pow(1024, i)).toFixed(4) + " " + sizes[i];
},

效果图如下:

count

1
2
3
4
5
select(func.count())
.select_from(table)
.where(
table.c.status == 1,
)

count if

1
2
3
4
5
6
7
func.COUNT(
func.IF(
user.c.province == "重庆",
True,
None,
)
).label("province"),

exists

1
2
3
4
5
6
7
8
9
select(["*"])
.select_from(table1)
.where(
table1.c.status == 1,
exists()
.where(
table1.c.id == table2.c.table1_id,
),
)

如果要使用 not exists 只需要在 exists() 前加上 ~ 变成 ~exists()

动态条件

1
2
3
conditions = []
conditions.append(user.c.sex == 1)
select(["*"]).select_from(user).where(and_(*conditions))

update

1
2
3
4
5
user.update()
.where(
user.c.id == 1
)
.values(score=user.c.score + 1)

order_by

1
2
3
4
#  升序
.order_by(user.c.id)
# 降序
.order_by(user.c.id.desc())

未完待续…

环境

GO 的环境安装可以使用 brew

1
brew install go

接下来只需要配置一下对应的环境信息

1
2
3
4
5
export GOPATH=/Users/jakehu/Documents/go
export GOBIN=$GOPATH/bin
export PATH=$PATH:$GOBIN
export GOPROXY=https://goproxy.cn,direct
export GO111MODULE=on

VSCODE

利用 goimports 格式化 import 排序使用

1
go install -v golang.org/x/tools/cmd/goimports@latest

利用 golangci-lint 做静态代码检查

1
go install -v github.com/golangci/golangci-lint/cmd/golangci-lint@latest

vscode 配置

1
2
3
4
5
6
7
8
9
10
11
// GO
"go.toolsManagement.autoUpdate": true,
"go.useLanguageServer": true,
"gopls": {
"experimentalWorkspaceModule": true,
},
"go.autocompleteUnimportedPackages": true,
"go.formatTool": "gofmt",
"go.lintTool": "golangci-lint",
"go.lintOnSave": "workspace",
// GO

前言

CuratorElastic 官方发布的一个管理 Elasticsearch 索引的工具,可以完成许多索引生命周期的管理工作,例如清理创建时间超过 7 天的索引、每天定时备份指定的索引、定时将索引从热节点迁移至冷节点等等。

安装

PIP

如果没有安装 pip 先安装 pip

1
wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo

利用阿里的 epel

1
yum -y install python-pip

Curator

Curator 本身是基于 Python 实现,所以可以使用 pip 安装

1
pip install elasticsearch-curator

升级

1
pip install -U elasticsearch-curator

查看版本

1
2
# curator --version
curator, version 5.8.4

使用

配置

新建配置文件 curator.yml,具体格式可以参考官方默认的配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
---
client:
hosts:
- 127.0.0.1
port: 9200
url_prefix:
use_ssl: False
certificate:
client_cert:
client_key:
ssl_no_validate: False
username: elastic # elastic用户
password: password # elastic密码
timeout: 30
master_only: False

logging:
loglevel: INFO
logfile:
logformat: default
blacklist: ['elasticsearch', 'urllib3']

删除

新建执行动作文件 delete_indices.yml,比如我执行删除7天前的索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
---
actions:
1:
action: delete_indices
description: "delete the index 7 days ago"
filters:
- filtertype: pattern
kind: prefix
value: filebeat-nginx-error-
- filtertype: age
source: name
direction: older
timestring: '%Y.%m.%d'
unit: days
unit_count: 7

执行

1
curator --config /etc/curator/config.yml /etc/curator/action/delete_indices.yml

执行结果如下:


未完待续

安装

全局安装 flake8

1
/usr/local/bin/python3 -m pip install -U flake8

全局安装 black

1
/usr/local/bin/python3 -m pip install -U black

配置

1
2
3
4
5
6
"python.languageServer": "Pylance",
"python.linting.flake8Enabled": true,
"python.formatting.provider": "black",
"python.linting.pylintEnabled": false, // 关闭pylint工具
"python.formatting.blackPath": "/usr/local/bin/black",
"python.linting.flake8Path": "/usr/local/bin/flake8",

flake8black 可选参数:

1
2
3
4
5
6
7
"python.linting.flake8Args": [
"--max-line-length=248", // 设置单行最长字符限制
"--ignore=W191, E266, W504"
],
"python.formatting.blackArgs": [
"--line-length=128" // 格式化时单行最长长度
],

排序 import 语句

1
/usr/local/bin/python3 -m pip install -U isort

配置

1
2
3
4
5
"[python]": {
"editor.codeActionsOnSave": {
"source.organizeImports": true
}
},

也可以配置用于全局

1
2
3
"editor.codeActionsOnSave": {
"source.organizeImports": true
},

前记

在一些软件中如 ES,时间格式都是 UTC 时间格式。记一记如何将 UTC 时间格式转换为本地北京时间格式。

原理很简单:将UTC转化为datetime时间格式->将转化的datetime时间加8小时->格式化为想要的格式

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import datetime

'''
UTC转北京时间
'''

def utc_format(utc_time, utc_time_format='%Y-%m-%dT%H:%M:%S.000Z'):
utc_datetime = datetime.datetime.strptime(utc_time, utc_time_format)
local_datetime = utc_datetime + datetime.timedelta(hours=8)
local_time_format = "%Y-%m-%d %H:%M:%S"
local_time = local_datetime.strftime(local_time_format)
return local_time

print(utc_format(utc_time="2021-10-26T09:34:31.000Z"))

前言

ELK 对于日志管理来说毫无疑问是最好的选择,但有的时候觉得 Logstash 比较的笨重,相反 Filebeat 也不失为一个好的选择。

此次安装的版本为 7.15.0

安装

对于几个组件的安装都是非常的简单,可以直接利用官方打好的包就好

Elasticsearch 安装文档

Kibana 安装文档

Filebeat 安装文档

安装时建议通过 RPM 包安装,这样能固定版本减少一些兼容性问题如:

1
2
3
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.15.0-x86_64.rpm
shasum -a 512 kibana-7.15.0-x86_64.rpm
sudo rpm --install kibana-7.15.0-x86_64.rpm

配置

Elasticsearch

如果只是单机版的话 Elasticsearch 配置倒是不用过多修改,接下来配置一下 Elasticsearch 开启密码访问就可以了

设置密码文档

安全配置参考

1
xpack.security.enabled: true # 普通的安全设置

最后再通过下面命令设置各个用户的密码

1
$ /usr/share/elasticsearch/bin/elasticsearch-setup-passwords auto

上面命令会设置各个内置用户的密码

内置用户文档

Kibana

Kibana 只需要配置如下几个项目即可

1
2
3
4
5
6
server.port: 5601 # 端口
server.host: "0.0.0.0" # 允许远程连接
elasticsearch.hosts: ["http://localhost:9200"] # ES的地址
elasticsearch.username: "kibana_system" # 上面设置的用户和密码
elasticsearch.password: "password"
i18n.locale: "zh-CN" # 页面支持中文

Filebeat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["localhost:9200"]
username: "elastic"
password: "elastic"
index: "filebeat-%{+yyyy.MM.dd}"
indices:
- index: "filebeat-nginx-access-%{+yyyy.MM.dd}"
when.equals:
fields.type: nginx.access
- index: "filebeat-nginx-error-%{+yyyy.MM.dd}"
when.equals:
fields.type: nginx.error

setup.ilm.enabled: false # 索引生命周期
setup.ilm.check_exists: false
setup.template.enabled: true # 索引模版
setup.template.name: "filebeat"
setup.template.pattern: "filebeat-*"

对于 Filebeat 的设置,这里的 username 需要用 elastic 用户。如果用 beats_system 用户的话会提示 403 无权限。

具体可以参考:Security error with beats_system account and Filebeat with system module

Filebeat Module

nginx 为例打开 nginx.accessnginx.error

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
vi /etc/filebeat/modules.d/nginx.yml

access:
enabled: true
var.paths: ["/var/log/nginx/access.log"]
input:
fields:
type: nginx.access

error:
enabled: true
var.paths: ["/var/log/nginx/error.log"]
input:
fields:
type: nginx.error

索引

进入 Kibana 管理页面,新建索引
Management->Stack Management->索引模式->创建索引模式

Pipeline

配置到索引时就已经可以在 Kibana 中看到有 nginx 默认的日志进来,但如果我们有自定义的日志格式,就需要用到 Pipeline

Pipeline 所存放的位置 /usr/share/filebeat/module/nginx/access/ingest/pipeline.yml

也可通过开发工具查询 GET _ingest/pipeline/filebeat-7.15.0-nginx-access-pipeline

Grok Debugger

前言

最近遇到一个需求,每 60S 刷新数据库数据到 Redis 缓存中,但应用又不止一个进程。此需求中对原子性并无太大的要求,只是如果每次只有一个进程执行,那么数据库的压力就会小很多。

于是想到用分布式锁来解决这个问题,最终选择了通过 Redis 来实现。

关于 Redis 实现分布式锁可以查看下面的文章:
分布式锁的实现之 redis 篇

Aioredlock

在众多的官方推荐的分布式锁客户端中,我选择了 Aioredlock

Distributed locks with Redis

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
async def aioredlock():
# retry_count 重试次数,这里配置为1,杜绝进程重复去获取
lock_manager = Aioredlock([{'host': 'localhost', 'port': 6379, 'db': 1}],retry_count=1)

while True:

# 每秒钟去检测锁的状态,如果锁被其他进程占用就跳过
# lock_timeout 锁的时间为 65S,相当于进程拿到锁后最大65秒后释放
# 业务逻辑执行 小于 65秒,业务逻辑执行完后当即释放锁
# 每60秒执行一次 sleep 59s即可,因为检测已经sleep了1s

await asyncio.sleep(1)
if await lock_manager.is_locked("_ztsg_auto_generate_code"):
#logger.info('The resource is already acquired')
continue

try:
async with await lock_manager.lock("_ztsg_auto_generate_code", lock_timeout=65) as lock:

assert lock.valid is True
assert await lock_manager.is_locked("_ztsg_auto_generate_code") is True

# 每60s钟运行一次任务
await asyncio.sleep(59)

logger.info('Start Tick! The time is '+str(os.getpid())+': %s' % datetime.now())
# 业务代码
logger.info('Stop Tick! The time is '+str(os.getpid())+': %s' % datetime.now())
except LockAcquiringError:
print('Something happened during normal operation. We just log it.')
except LockError:
print('Something is really wrong and we prefer to raise the exception')
except Exception as e:
print(e)
print('Something else went wrong')

通过上面代码就能实现,需求所要求

前记

在使用 Tortoise Orm 的过程中发现数据库自动插入、更新的时间是 UTC 时区时间,通过官网文档发现可以在连接时对时区进行设置

修改

通过连接配置来修改时区,默认情况下的连接配置

1
2
3
register_tortoise(
app, db_url=db_url, modules={"models": ["app.models"]}, generate_schemas=False
)

如果需要修改配置,则不能用 db_url 模式连接需改为 config 模式连接,配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
register_tortoise(
app,
config={
'connections': {
'default': db_url
},
'apps': {
'models': {
"models": ["app.models"],
'default_connection': 'default',
}
},
"use_tz": False,
"timezone": "Asia/Shanghai",
},
generate_schemas=False
)

通过以上配置就能将数据库时区设置为上海时区

前言

当原子系统很多 (非微服务),而且各个原子系统之间需要相互调用,这时就需要保证两个系统之间的认证、以及数据加密。

这个时候就需要用到对称加解密了

选择

对于如何选择合适的加密算法,可以参考一下下面这篇文章

如何选择 AES 加密模式(CBC ECB CTR OCB CFB)?

最后我选择了两种方式分别来测试和实现 CBCOCB

另外 pycrypto 已经不再安全,建议使用 pycryptodome,它是 pycrypto 的分支,在安全性方面有较大提升。

1
pip install pycryptodome

CBC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import json
from base64 import b64encode, b64decode
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from Crypto.Random import get_random_bytes

class AesCbc:
def __init__(self, key):
self.bs = AES.block_size
self.key = key
self.mode = AES.MODE_CBC

def encrypt(self, data):
cipher = AES.new(self.key, self.mode)
ct_bytes = cipher.encrypt(pad(data, self.bs))
iv = b64encode(cipher.iv).decode('utf-8')
ct = b64encode(ct_bytes).decode('utf-8')
return json.dumps({'iv': iv, 'ciphertext': ct})

def decrypt(self, json_input):
try:
b64 = json.loads(json_input)
iv = b64decode(b64['iv'])
ct = b64decode(b64['ciphertext'])
cipher = AES.new(self.key, self.mode, iv)
plaintext = unpad(cipher.decrypt(ct), self.bs)
return plaintext.decode('utf-8')
except (ValueError, KeyError) as err:
print("Incorrect decryption ", err)
return None

if __name__ == "__main__":
data = "需要加密的数据".encode('utf-8')
key = get_random_bytes(16) # Bytes

aes_cipher = AesCbc(key)

encrypt_reuslt = aes_cipher.encrypt(data)
print("密文: ", encrypt_reuslt)

plaintext = aes_cipher.decrypt(encrypt_reuslt)
print("原文: ", plaintext)

结果:

1
2
密文:  {"iv": "K8xL41sI3UoXaeWohUuZEA==", "ciphertext": "fLGcOq43vTZc9x3HX8Q9Nv82cwVT6WNTj5mcpuPEckw="}
原文: 需要加密的数据

OCB

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import json
from base64 import b64encode, b64decode
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes

class AesOcb:
def __init__(self, key):
self.bs = AES.block_size
self.key = key
self.mode = AES.MODE_OCB
self.json_k = ['nonce', 'header', 'ciphertext', 'tag']

def encrypt(self, header, data):
header = header
cipher = AES.new(self.key, self.mode)
cipher.update(header)
ciphertext, tag = cipher.encrypt_and_digest(data)
json_v = [b64encode(x).decode('utf-8') for x in [cipher.nonce, header, ciphertext, tag]]
return json.dumps(dict(zip(self.json_k, json_v)))

def decrypt(self, json_input):
try:
b64 = json.loads(json_input)
jv = {k: b64decode(b64[k]) for k in self.json_k}
cipher = AES.new(self.key, self.mode, nonce=jv['nonce'])
cipher.update(jv['header'])
plaintext = cipher.decrypt_and_verify(jv['ciphertext'], jv['tag'])
return plaintext.decode('utf-8')
except (ValueError, KeyError) as err:
# 解密错误
print(err)
return None

if __name__ == "__main__":
data = "需要加密的数据".encode('utf-8')
key = get_random_bytes(16) # Bytes
header = b'header'

aes_cipher = AesOcb(key)

encrypt_reuslt = aes_cipher.encrypt(header, data)
print("密文: ", encrypt_reuslt)

plaintext = aes_cipher.decrypt(encrypt_reuslt)
print("原文: ", plaintext)

结果:

1
2
密文:  {"nonce": "9Wd6sA1QGSdjXHu1zACA", "header": "aGVhZGVy", "ciphertext": "tyaCFrLuriy6F3xJqs0CehNWe3g7", "tag": "IqDrP9zX00aZMRe7DuCRzQ=="}
原文: 需要加密的数据
0%