数据分析之账号被盗(共用)风险数据挖掘

uuid operation_ip username user_id operation_type department operation_time
ffds 192.168.1.8 张三 1818 delete Sales 2023/01/02 09:21:56
ddsf 192.168.1.8 张三 1818 logout Sales 2023/01/02 09:21:57
1dd1 192.168.4.246 张三 1818 delete Sales 2023/01/02 09:21:59
1df6 192.168.6.27 张三 2222 view Finance 2023/01/06 12:09:24
1fs7 192.168.2.7 张三 2222 login Finance 2023/01/07 17:08:20
1d18 192.168.2.7 张三 2222 logout Finance 2023/01/07 17:08:22
1ds9 192.168.4.115 张三 2222 logout Finance 2023/01/07 17:08:23
以上为人工保本(保证有可挖掘的风险数据)
导入样本数据:
1
2
3
4
import duckdb

con = duckdb.connect('local.duckdb')
con.sql("CREATE TABLE test AS SELECT * FROM 'weblog.csv'")

以下为生成的结构相同的随机数据100万条,插入test表中,以便测试性能, 随机的同时,需要确保:

  1. 一个 user_id 不能同时对应两个 department中;
  2. 一个user_name可能对应多个user_id,但是一个user_id不会对应多个user_name。 也就是说:
  3. 一个人不会同时出现在两个部门中;
  4. 对于同名同姓的两个人,其user_id是不同的;
  5. 名字不同则user_id肯定不同。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from faker import Faker
import random
import uuid
from datetime import datetime, timedelta
import pandas as pd

# Initialize the Faker library
fake = Faker('zh_CN')

# Number of unique users and departments
n_users = 100  # Number of unique usernames
n_departments = 5
n_records = 1000000  # Number of records to generate

# Generate unique usernames and corresponding departments
unique_users = [fake.name() for _ in range(n_users)]
departments = [fake.company() for _ in range(n_departments)]

# Generate unique user_ids for each username
username_to_ids = {username: [fake.unique.random_number(digits=4) for _ in range(random.randint(1, 3))] for username in unique_users}

# Map each user_id to a single department
user_id_to_department = {}
for username, ids in username_to_ids.items():
    for user_id in ids:
        user_id_to_department[user_id] = random.choice(departments)

# Generate the random data
random_data = []
for _ in range(n_records):
    username = random.choice(unique_users)
    user_id = random.choice(username_to_ids[username])
    department = user_id_to_department[user_id]
    operation_type = random.choice(["登录", "登出", "添加", "删除", "编辑", "查看", "上传", "下载", "分享", "评论", "点赞", "收藏", "回复", "更新", "搜索", "排序", "过滤", "分配", "创建", "发布", "撤销", "审核", "激活", "禁用", "备份", "恢复", "重置"])
    operation_ip = fake.ipv4_private()
    operation_time = datetime(2023, 1, 1) + timedelta(seconds=random.randint(0, 86400 * 30))
    random_data.append([uuid.uuid4(),operation_ip,username,user_id,operation_type,department,operation_time])
# Create a DataFrame
df_random = pd.DataFrame(random_data, columns=["uuid","operation_ip","username","user_id","operation_type","department","operation_time"])
con.sql("INSERT INTO test SELECT * FROM df_random")
usernmae userid department pre_opera next_opera pre_time next_time pre_ip next_ip
张三 1818 Sales delete delete 2023/01/02 09:21:56 2023/01/02 09:21:59 192.168.1.8 192.168.4.246
张三 1818 Sales logout delete 2023/01/02 09:21:57 2023/01/02 09:21:59 192.168.1.8 192.168.4.246
张三 2222 Finance login logout 2023/01/07 17:08:20 2023/01/07 17:08:23 192.168.2.7 192.168.4.115
张三 2222 Finance logout logout 2023/01/07 17:08:22 2023/01/07 17:08:23 192.168.2.7 192.168.4.115

属于笛卡尔乘积,如果表很大,则资源消耗过大,时间花费很长。 经测试,100万行数据43.2秒出结果,1000万行(348MB数据库文件大小)数据74分31.7秒,且I7-8700基本都是97+%占用,24G内存电脑慢慢占用至8-9G

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
COPY (
        SELECT 
        A.username,
        A.user_id,
        A.department,
        A.operation_type AS pre_opera,
        B.operation_type AS next_opera,
        A.operation_time AS pre_time,
        B.operation_time AS next_time,
        A.operation_ip AS pre_ip,
        B.operation_ip AS next_ip,
        A.uuid AS pre_uuid,
        B.uuid AS next_uuid,
FROM test A, test B
WHERE A.user_id = B.user_id
  AND A.operation_time < B.operation_time
  AND DATEDIFF('second', A.operation_time, B.operation_time) <= 3
  AND A.operation_ip != B.operation_ip
ORDER BY A.user_id, A.operation_time
) TO 'RESULT2.CSV' (header,delimter ',');

既然笛卡尔乘积效率与参与处理的数据量大小有关系,那么基于本问题的特殊性,我们可以先将数据按user_id分组,然后再进行上述计算,经测试耗时268.5秒。具体Python代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import duckdb

con = duckdb.connect('local.duckdb')
userids = con.sql('SELECT DISTINCT user_id FROM test').to_df()
userids = list(userids['user_id'])
con.sql("""create table if not exists multioperat (
                    username varchar, user_id bigint, department varchar,
                    pre_uuid varchar,
                    next_uuid varchar,
                    pre_opera varchar,
                    next_opera varchar,
                    pre_time timestamp,
                    next_time timestamp,
                    pre_ip varchar,
                    next_ip varchar)
            """)
for userid in userids:
    sql_str = f"""
    INSERT INTO multioperat
            SELECT 
            A.username,
            A.user_id,
            A.department,
            A.uuid AS pre_uuid,
            B.uuid AS next_uuid,
            A.operation_type AS pre_opera,
            B.operation_type AS next_opera,
            A.operation_time AS pre_time,
            B.operation_time AS next_time,
            A.operation_ip AS pre_ip,
            B.operation_ip AS next_ip
    FROM test A, test B
    WHERE 
        A.user_id = {userid} AND 
        B.user_id = {userid} AND
        A.operation_time < B.operation_time AND 
        DATEDIFF('second', A.operation_time, B.operation_time) <= 3 AND
        A.operation_ip != B.operation_ip
    ORDER BY A.operation_time;"""
    con.sql(sql_str)
con.close()

如果不求全,仅仅将以user_idoperation_time排序后,比较相邻的两个操作是否3秒内不同IP进行,经测试,100万0.4秒出结果,1000万行数据6.2秒出结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
COPY (
       WITH ordered_ops AS (
    SELECT 
        *,
        LEAD(uuid) OVER (PARTITION BY user_id ORDER BY operation_time) AS next_uuid,
        LEAD(operation_ip) OVER (PARTITION BY user_id ORDER BY operation_time) AS next_ip,
        LEAD(operation_type) OVER (PARTITION BY user_id ORDER BY operation_time) AS next_opera,
        LEAD(operation_time) OVER (PARTITION BY user_id ORDER BY operation_time) AS next_time
    FROM 
        test
)
SELECT 
    username,
    user_id,
    department,
    uuid AS pre_uuid,
    next_uuid,
    operation_type AS pre_opera,
    next_opera,
    operation_time AS pre_time,
    next_time,
    operation_ip AS pre_ip,
    next_ip
FROM 
    ordered_ops
WHERE 
    next_ip IS NOT NULL AND
    next_ip != operation_ip AND
    DATEDIFF('second', operation_time, next_time) BETWEEN 0 AND 3
ORDER BY 
    user_id, operation_time
) TO 'RESULT.CSV' (header,delimter ',');

test.ipynb weblog.csv

完全正确RESULT2.csv文件体积几乎只有部分正确RESULT.CSV的一半。而且完全正确包含了所有预期结果的4条记录,但是部分正确只有其中2条——即时间相邻的两条。