QQ群数据集处理笔记

下载

搭建

https://www.52pojie.cn/thread-1813624-1-1.html

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
-- 打开 开始-->程序-->Microsoft SQL Server-->查询分析器

-- 附加群号数据库:

EXEC sp_attach_db "QunInfo1", "G:\数据库所在目录\QunData\QunInfo1_Data.MDF","G:\数据库所在目录\QunData\QunInfo1_Log.LDF"
EXEC sp_attach_db "QunInfo2", "G:\数据库所在目录\QunData\QunInfo2_Data.MDF","G:\数据库所在目录\QunData\QunInfo2_Log.LDF"
EXEC sp_attach_db "QunInfo3", "G:\数据库所在目录\QunData\QunInfo3_Data.MDF","G:\数据库所在目录\QunData\QunInfo3_Log.LDF"
EXEC sp_attach_db "QunInfo4", "G:\数据库所在目录\QunData\QunInfo4_Data.MDF","G:\数据库所在目录\QunData\QunInfo4_Log.LDF"
EXEC sp_attach_db "QunInfo5", "G:\数据库所在目录\QunData\QunInfo5_Data.MDF","G:\数据库所在目录\QunData\QunInfo5_Log.LDF"
EXEC sp_attach_db "QunInfo6", "G:\数据库所在目录\QunData\QunInfo6_Data.MDF","G:\数据库所在目录\QunData\QunInfo6_Log.LDF"
EXEC sp_attach_db "QunInfo7", "G:\数据库所在目录\QunData\QunInfo7_Data.MDF","G:\数据库所在目录\QunData\QunInfo7_Log.LDF"
EXEC sp_attach_db "QunInfo8", "G:\数据库所在目录\QunData\QunInfo8_Data.MDF","G:\数据库所在目录\QunData\QunInfo8_Log.LDF"
EXEC sp_attach_db "QunInfo9", "G:\数据库所在目录\QunData\QunInfo9_Data.MDF","G:\数据库所在目录\QunData\QunInfo9_Log.LDF"
EXEC sp_attach_db "QunInfo10", "G:\数据库所在目录\QunData\QunInfo10_Data.MDF","G:\数据库所在目录\QunData\QunInfo10_Log.LDF"
EXEC sp_attach_db "QunInfo11", "G:\数据库所在目录\QunData\QunInfo11_Data.MDF","G:\数据库所在目录\QunData\QunInfo11_Log.LDF"

附加群成员数据库:

EXEC sp_attach_db "GroupData1", "G:\数据库所在目录\QunData\GroupData1_Data.MDF","G:\数据库所在目录\QunData\GroupData1_Log.LDF"
EXEC sp_attach_db "GroupData2", "G:\数据库所在目录\QunData\GroupData2_Data.MDF","G:\数据库所在目录\QunData\GroupData2_Log.LDF"
EXEC sp_attach_db "GroupData3", "G:\数据库所在目录\QunData\GroupData3_Data.MDF","G:\数据库所在目录\QunData\GroupData3_Log.LDF"
EXEC sp_attach_db "GroupData4", "G:\数据库所在目录\QunData\GroupData4_Data.MDF","G:\数据库所在目录\QunData\GroupData4_Log.LDF"
EXEC sp_attach_db "GroupData5", "G:\数据库所在目录\QunData\GroupData5_Data.MDF","G:\数据库所在目录\QunData\GroupData5_Log.LDF"
EXEC sp_attach_db "GroupData6", "G:\数据库所在目录\QunData\GroupData6_Data.MDF","G:\数据库所在目录\QunData\GroupData6_Log.LDF"
EXEC sp_attach_db "GroupData7", "G:\数据库所在目录\QunData\GroupData7_Data.MDF","G:\数据库所在目录\QunData\GroupData7_Log.LDF"
EXEC sp_attach_db "GroupData8", "G:\数据库所在目录\QunData\GroupData8_Data.MDF","G:\数据库所在目录\QunData\GroupData8_Log.LDF"
EXEC sp_attach_db "GroupData9", "G:\数据库所在目录\QunData\GroupData9_Data.MDF","G:\数据库所在目录\QunData\GroupData9_Log.LDF"
EXEC sp_attach_db "GroupData10", "G:\数据库所在目录\QunData\GroupData10_Data.MDF","G:\数据库所在目录\QunData\GroupData10_Log.LDF"
EXEC sp_attach_db "GroupData11", "G:\数据库所在目录\QunData\GroupData11_Data.MDF","G:\数据库所在目录\QunData\GroupData11_Log.LDF"


拆分数据库(不会删除数据库文件):

拆分群号数据库

exec sp_detach_db QunInfo1, true
exec sp_detach_db QunInfo2, true
exec sp_detach_db QunInfo3, true
exec sp_detach_db QunInfo4, true
exec sp_detach_db QunInfo5, true
exec sp_detach_db QunInfo6, true
exec sp_detach_db QunInfo7, true
exec sp_detach_db QunInfo8, true
exec sp_detach_db QunInfo9, true
exec sp_detach_db QunInfo10, true
exec sp_detach_db QunInfo11, true


拆分群成员数据库

exec sp_detach_db GroupData1, true
exec sp_detach_db GroupData2, true
exec sp_detach_db GroupData3, true
exec sp_detach_db GroupData4, true
exec sp_detach_db GroupData5, true
exec sp_detach_db GroupData6, true
exec sp_detach_db GroupData7, true
exec sp_detach_db GroupData8, true
exec sp_detach_db GroupData9, true
exec sp_detach_db GroupData10, true
exec sp_detach_db GroupData11, true

迁移到duckdb

使用pymysql连接sql server 2008 r2数据库踩的坑,始终连接报错DB-Lib error message 20002, severity 9 解决方案: https://cloud.tencent.com/developer/article/2376561 最终代码:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import gc
from time import time

import duckdb
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.engine.url import make_url
from sqlalchemy.sql import text
from tqdm import tqdm

# 函数定义


def get_tables_in_schema(schema, engine):
    """获取当前数据库中指定 schema 下的所有表"""
    sql = f"""SELECT tab.name as [table]
              FROM sys.tables tab
              WHERE schema_name(tab.schema_id) = '{schema}'"""
    df = pd.read_sql(sql, engine)
    return df["table"].values.tolist()


def export_table_to_duckdb(schema, table, con, duckdb_con):
    """将给定的表数据从 SQL Server 导出并写入 DuckDB"""
    print(f"开始导出 {table} 到 DuckDB...")

    if table.lower() == "group":
        table = f"[{table}]"  # 如果表名是保留关键字,使用方括号

    # 获取表的总行数
    total_rows = con.execute(text(f"SELECT COUNT(*) FROM {schema}.{table}")).fetchone()[
        0
    ]
    print(f"表: {table}, 总行数: {total_rows}")
    if total_rows > 0:
        # 初始化进度条
        progress_bar = tqdm(total=total_rows, desc=f"导出 {table}", unit="行")

        lines = 0

        if "Group" in table:
            sql = f"SELECT QQNum, Nick, Age, Gender, Auth, QunNum FROM {schema}.{table}"  # 不再使用 TOP 限制,只是做小数据量测试
        elif "Qun" in table:
            sql = f"SELECT CAST(QunNum AS BIGINT) QunNum,CAST(MastQQ AS BIGINT) MastQQ,CAST(CreateDate AS DATE)  CreateDate, Title,CAST(Class AS BIGINT)  Class, QunText FROM {schema}.{table}"

        for i, df in enumerate(
            pd.read_sql(sql, con, chunksize=5000000)
        ):  # 逐块读取 100k 行
            t_step = time()

            # 在 DuckDB 中创建表结构(如果不存在)
            if "Group" in table:
                duckdb_con.execute(
                    "CREATE TABLE IF NOT EXISTS GroupInfo (QQNum UBIGINT, Nick VARCHAR, Age INT, Gender INT, Auth INT, QunNum UBIGINT)"
                )
            elif "Qun" in table:
                duckdb_con.execute(
                    "CREATE TABLE IF NOT EXISTS QunInfo (QunNum UBIGINT, MastQQ UBIGINT, CreateDate DATE, Title VARCHAR, Class UBIGINT, QunText VARCHAR)"
                )

            # 将数据插入 DuckDB 数据库
            duckdb_con.execute(
                "INSERT INTO GroupInfo BY NAME (SELECT * FROM df);"
                if "Group" in table
                else "INSERT INTO QunInfo BY NAME (SELECT * FROM df);"
            )

            lines += df.shape[0]

            # 更新进度条
            progress_bar.update(df.shape[0])
            progress_bar.set_postfix(
                {
                    "本次导入耗时": f"{round(time() - t_step, 2)} 秒",
                }
            )
            # 强制清理内存
            del df
            gc.collect()

        # 关闭进度条
        progress_bar.close()


# 数据库连接信息
db = {
    "host": "127.0.0.1",
    "port": "1433",
    "user": "sa",
    "password": "xxx",  # 请填写实际密码
}

# 创建连接到 master 数据库的引擎
url = make_url(
    f"mssql+pymssql://{db['user']}:{db['password']}@{db['host']}:{db['port']}/master"
)
engine = create_engine(url)
con = engine.connect()

# 获取所有数据库的名称(排除系统数据库,如 master、tempdb、model 等)
query = "SELECT name FROM sys.databases WHERE name NOT IN ('master', 'tempdb', 'model', 'msdb')"
databases_df = pd.read_sql(query, con)

# 输出所有数据库的名称
databases_list = databases_df["name"].tolist()
print("Available databases:", databases_list)


# 遍历每个数据库
for db_name in databases_list:
    print(f"\nSwitching to database: {db_name}")
    # 动态更改连接字符串,切换到当前数据库
    new_url = url.set(database=db_name)
    engine = create_engine(new_url)  # 为每个数据库创建新的引擎
    con = engine.connect().execution_options(stream_results=True)  # 支持流式返回

    # 连接到 DuckDB 数据库
    duckdb_con = duckdb.connect(r"F:\QQ_dataset.duckdb")

    # 获取表格列表并导出
    schema = "dbo"  # 假设所有数据库都使用 'dbo' 模式
    for table in get_tables_in_schema(schema, engine):
        if "Group" in table or "Qun" in table:
            print(f"Exporting table: {table}")
            export_table_to_duckdb(schema, table, con, duckdb_con)
    # 关闭 DuckDB 连接
    duckdb_con.close()
    # 关闭当前连接
    con.close()

以上代码运行需要半天时间

SQL server中储存中文字符是使用排序规则来设置字符串编码的,默认是Chinese_PRC_CI_AS,在duckdb中默认是utf8mb4_general_ci,所以duckdb中中文乱码。 但是数据已经到duckdb中,不可能重新导入。 先为每条记录增加PRIMARY KEY,方便数据处理:

1
2
3
4
CREATE SEQUENCE group_serial;
ALTER TABLE GroupInfo ADD COLUMN id INTEGER DEFAULT nextval('group_serial');
CREATE SEQUENCE qun_serial;
ALTER TABLE QunInfo ADD COLUMN id INTEGER DEFAULT nextval('qun_serial');

经部分测试可以在迁移时通过在url中加入参数?encoding=gbk来修复中文编码问题。

1
url = ake_url(f"mssql+pymssql://{db['user']}:{db['password']}@{db['host']}:{db['port']}/master?charset=GBK")

下面代码是对已经迁移完成的数据进行编码修复(耗时9154.48 秒):

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import time  # 导入 time 用于耗时计算

import duckdb
from tqdm import tqdm  # 导入 tqdm 用于进度条


# 定义自定义的编码修复函数
def fix_gbk_encoding(text):
    try:
        return text.encode("latin1").decode("gbk")
    except (UnicodeEncodeError, UnicodeDecodeError):
        return text  # 如果转换失败,保留原始数据


# 获取总行数(通过 currval 获取)
def get_total_rows(duckdb_con, table_name, serial_column):
    query = f"SELECT currval('{serial_column}') AS currval;"
    return duckdb_con.execute(query).fetchone()[0]


# 执行批量处理的函数
def process_batch(duckdb_con, table_name, column_names, id_column, start_id, end_id):
    # 构造 UPDATE 语句,使用 ID BETWEEN AND 范围
    update_query = f"""
    UPDATE {table_name}
    SET {', '.join([f'{col} = fix_gbk_encoding({col})' for col in column_names])}
    WHERE {id_column} BETWEEN {start_id} AND {end_id};
    """
    # 执行批量更新
    duckdb_con.execute(update_query)


# 开始代码执行总耗时计时
start_total_time = time.time()


# ========== 处理 GroupInfo 表 ==========
duckdb_con = duckdb.connect("F:/QQ_dataset.duckdb")

# 注册函数
duckdb_con.create_function("fix_gbk_encoding", fix_gbk_encoding, [str], str)
# 1. 获取 GroupInfo 表总行数的耗时
start_time = time.time()
total_rows_group = get_total_rows(duckdb_con, "GroupInfo", "group_serial")
time_get_total_group = time.time() - start_time
print(f"获取 GroupInfo 表总行数耗时:{time_get_total_group:.2f} 秒")

# 定义一个批量更新的大小
batch_size = 5000000  # 每次处理 100000 行
reconnect_threshold = batch_size * 10  # 处理 1000000 行时重新连接
progress_bar_group = tqdm(
    total=total_rows_group, desc="修复 GroupInfo 表的 Nick 列", unit="行"
)

# 2. 开始 GroupInfo 表分批处理的耗时
start_group_time = time.time()

# 分批处理 GroupInfo 表数据,使用 ID BETWEEN AND
for start_id in range(1, total_rows_group + 1, batch_size):
    end_id = min(start_id + batch_size - 1, total_rows_group)
    process_batch(duckdb_con, "GroupInfo", ["Nick"], "id", start_id, end_id)

    # 更新进度条
    progress_bar_group.update(end_id - start_id + 1)

    # 每处理 reconnect_threshold 条记录,重新连接以释放内存
    if start_id % reconnect_threshold == 1 and start_id > 1:
        duckdb_con.close()  # 关闭当前连接以释放内存
        duckdb_con = duckdb.connect("F:/QQ_dataset.duckdb")  # 重新连接
        duckdb_con.create_function(
            "fix_gbk_encoding", fix_gbk_encoding, [str], str
        )  # 重新注册函数

# 关闭进度条
progress_bar_group.close()

# 3. 记录 GroupInfo 表总耗时
time_total_group = time.time() - start_group_time
print(f"GroupInfo 表总耗时:{time_total_group:.2f} 秒")

# ========== 处理 QunInfo 表 ==========
# 1. 获取 QunInfo 表总行数的耗时
start_time = time.time()
total_rows_qun = get_total_rows(duckdb_con, "QunInfo", "qun_serial")
time_get_total_qun = time.time() - start_time
print(f"获取 QunInfo 表总行数耗时:{time_get_total_qun:.2f} 秒")

# 初始化进度条
progress_bar_qun = tqdm(
    total=total_rows_qun, desc="修复 QunInfo 表的 Title 和 QunText 列", unit="行"
)

# 2. 开始 QunInfo 表分批处理的耗时
start_qun_time = time.time()

# 分批处理 QunInfo 表数据,使用 ID BETWEEN AND
for start_id in range(1, total_rows_qun + 1, batch_size):
    end_id = min(start_id + batch_size - 1, total_rows_qun)
    process_batch(duckdb_con, "QunInfo", ["Title", "QunText"], "id", start_id, end_id)

    # 更新进度条
    progress_bar_qun.update(end_id - start_id + 1)

    # 每处理 reconnect_threshold 条记录,重新连接以释放内存
    if start_id % reconnect_threshold == 1 and start_id > 1:
        duckdb_con.close()  # 关闭当前连接以释放内存
        duckdb_con = duckdb.connect("F:/QQ_dataset.duckdb")  # 重新连接
        duckdb_con.create_function(
            "fix_gbk_encoding", fix_gbk_encoding, [str], str
        )  # 重新注册函数

# 关闭进度条
progress_bar_qun.close()

# 3. 记录 QunInfo 表总耗时
time_total_qun = time.time() - start_qun_time
print(f"QunInfo 表总耗时:{time_total_qun:.2f} 秒")

# ========== 计算代码执行的总耗时 ==========

total_time = time.time() - start_total_time
print(f"代码执行总耗时:{total_time:.2f} 秒")

# 关闭最后一个连接
duckdb_con.close()

print("数据修复完成。")

数据去重

groupinfo表中存在重复数据,需要去重。 但是该表有28+亿行数据,直接全表去重,内存不够,虽然duckdb当内存不够时会使用磁盘辅助,但是对磁盘读写速度影响很大,所以还不是最优方案。 思路是:分批次去重。但如果基于前面增加的id字段来分批,则重复数据可能会分散在不同批次中,无法一次性删除。解决办法:以qunnum分批,因为去重主要是针对qunnum和qqnum组合去重,而以qunnum分批,则重复数据会集中在同一批次中,可以一次性删除。但是一条qunnum可能对应多个qqnum,所以需要先统计各个qunnum对应的行数,然后根据行数分批,这样重复数据会集中在同一批次中,可以一次性删除。

1
create table qunnumcount as select qunnum ,count(*) AS record_count from groupinfo group by qunnum;

第二步,使用以下脚本分批次去重[花费12小时左右]。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import pandas as pd
import duckdb
from IPython.display import clear_output

con = duckdb.connect(database=r"F:\QunData.duckdb")

# 获取每个 QunNum 对应的记录数
query = "select * from qunnumcount;"
df = con.execute(query).df()

# 分批处理
batch_size = 1000000
current_batch = []
current_count = 0
batches = []

for index, row in df.iterrows():
    current_batch.append(row['QunNum'])
    current_count += row['record_count']
    
    if current_count >= batch_size:
        batches.append(current_batch)
        current_batch = []
        current_count = 0

# 处理最后的剩余数据
if current_batch:
    batches.append(current_batch)
del df
# 按照分批次执行SQL
for batch in batches:
    qunnum_min = min(batch)
    qunnum_max = max(batch)
    sql = f"""
    DELETE FROM groupinfo WHERE id IN (WITH RankedGroupInfo AS (
        SELECT id,
               ROW_NUMBER() OVER (PARTITION BY QQNum, QunNum,Nick ORDER BY id ASC) AS rn
        FROM groupinfo
        WHERE QunNum BETWEEN {qunnum_min} AND {qunnum_max}
    )
    SELECT id
        FROM RankedGroupInfo
        WHERE rn > 1
    );
    """
    con.execute(sql)
con.close()