In [77]:
import duckdb

con = duckdb.connect('local.duckdb')
con.sql("CREATE TABLE test AS SELECT * FROM 'weblog.csv'")

# 新增1000万数据用于测试

In [78]:
from faker import Faker
import random
import uuid
from datetime import datetime, timedelta
import pandas as pd

# Initialize the Faker library
fake = Faker('zh_CN')

# Number of unique users and departments
n_users = 100  # Number of unique usernames
n_departments = 5
n_records = 10000000  # Number of records to generate

# Generate unique usernames and corresponding departments
unique_users = [fake.name() for _ in range(n_users)]
departments = [fake.company() for _ in range(n_departments)]

# Generate unique user_ids for each username
username_to_ids = {username: [fake.unique.random_number(digits=4) for _ in range(random.randint(1, 3))] for username in unique_users}

# Map each user_id to a single department
user_id_to_department = {}
for username, ids in username_to_ids.items():
    for user_id in ids:
        user_id_to_department[user_id] = random.choice(departments)

# Generate the random data
random_data = []
for _ in range(n_records):
    username = random.choice(unique_users)
    user_id = random.choice(username_to_ids[username])
    department = user_id_to_department[user_id]
    operation_type = random.choice(["登录", "登出", "添加", "删除", "编辑", "查看", "上传", "下载", "分享", "评论", "点赞", "收藏", "回复", "更新", "搜索", "排序", "过滤", "分配", "创建", "发布", "撤销", "审核", "激活", "禁用", "备份", "恢复", "重置"])
    operation_ip = fake.ipv4_private()
    operation_time = datetime(2023, 1, 1) + timedelta(seconds=random.randint(0, 86400 * 30))
    random_data.append([uuid.uuid4(),operation_ip,username,user_id,operation_type,department,operation_time])
# Create a DataFrame
df_random = pd.DataFrame(random_data, columns=["uuid","operation_ip","username","user_id","operation_type","department","operation_time"])
con.sql("INSERT INTO test SELECT * FROM df_random")

KeyboardInterrupt: 

# 部分正确

In [None]:
con.sql("""
    COPY (
       WITH ordered_ops AS (
    SELECT 
        *,
        LEAD(uuid) OVER (PARTITION BY user_id ORDER BY operation_time) AS next_uuid,
        LEAD(operation_ip) OVER (PARTITION BY user_id ORDER BY operation_time) AS next_ip,
        LEAD(operation_type) OVER (PARTITION BY user_id ORDER BY operation_time) AS next_opera,
        LEAD(operation_time) OVER (PARTITION BY user_id ORDER BY operation_time) AS next_time
    FROM 
        test
)
SELECT 
    username,
    user_id,
    department,
    uuid AS pre_uuid,
    next_uuid,
    operation_type AS pre_opera,
    next_opera,
    operation_time AS pre_time,
    next_time,
    operation_ip AS pre_ip,
    next_ip
FROM 
    ordered_ops
WHERE 
    next_ip IS NOT NULL AND
    next_ip != operation_ip AND
    DATEDIFF('second', operation_time, next_time) BETWEEN 0 AND 3
ORDER BY 
    user_id, operation_time
) TO 'RESULT.CSV' (header,delimter ',');
""")

# 完全正确

In [None]:
con.sql("""COPY (
        SELECT A.username,
       A.user_id,
       A.department,
       A.operation_type AS pre_opera,
       B.operation_type AS next_opera,
       A.operation_time AS pre_time,
       B.operation_time AS next_time,
       A.operation_ip AS pre_ip,
       B.operation_ip AS next_ip
FROM test A, test B
WHERE A.user_id = B.user_id
  AND A.operation_time < B.operation_time
  AND DATEDIFF('second', A.operation_time, B.operation_time) <= 3
  AND A.operation_ip != B.operation_ip
ORDER BY A.user_id, A.operation_time
) TO 'RESULT2.CSV' (header,delimter ',');""")

In [79]:
con.close()