1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
# 先在duckdb中进行数据清洗
# --第一次清洗
# -- 其他字段相同的情况下,保留pid非空的记录
# CREATE TABLE datas_clean AS SELECT * FROM datas
# DELETE FROM datas_clean
# WHERE pid IS NULL AND phone IN (
# SELECT phone
# FROM datas
# WHERE pid IS NOT NULL
# ) AND id IN (
# SELECT id
# FROM datas
# WHERE pid IS NOT NULL
# );
# --第二次清洗
# --基于id和pid相同合并phone列,保存到datas_clean_clean表中
# CREATE TABLE datas_clean_clean AS
# SELECT
# MAX(credit) AS credit,
# MAX(company) AS company,
# MAX(name) AS name,
# array_agg(phone) AS phones,
# id,
# pid,
# MAX(rootid) AS rootid
# FROM
# datas_clean
# GROUP BY
# id,
# pid;
import duckdb
import pandas as pd
df = pd.DataFrame(columns=['id', 'rootid', 'phones'])
def find_related_ids(current_id,phone):
id_phone_dict = {current_id:[phone]}
def find_son_ids(current_id):
nonlocal id_phone_dict # 修改函数外部变量
# 父 ID 等于上面 ID 的 phone, 即儿子的 ID
conn.execute(f"select id, phones from temp_table{phone} where pid='{current_id}';")
son_ids = conn.fetchall()
for son_id_tuple in son_ids:
son_id, son_phone = son_id_tuple
if son_id:
id_phone_dict[son_id] = son_phone
find_son_ids(son_id)
def find_parent_ids(current_id):
nonlocal id_phone_dict # 修改函数外部变量
# id 等于上面 PID 的 phone, 即父亲的 ID
conn.execute(f"select pid, phones from temp_table{phone} where id='{current_id}';")
parent_ids = conn.fetchall()
for pid_tuple in parent_ids:
parent_id, parent_phone = pid_tuple
if parent_id:
# path_ids.insert(0, parent_id) # 在字典前方插入父亲 ID,保持父辈在前,子辈在后的顺序
temp_id_phone_dict = {parent_id:parent_phone}
temp_id_phone_dict.update(id_phone_dict)
id_phone_dict= temp_id_phone_dict
# id_phone_dict[parent_id] = parent_phone
find_parent_ids(parent_id)
find_son_ids(current_id)
# find_parent_ids(current_id)
return id_phone_dict
def add_path(init_phone:str):
global df
# start = time.time()
"""找出特定phone的上下级路径并形成路径后保存到path列"""
# 找出phone对应的credit,然后再根据credit找出同样credit的所有行,并将结果保存到新建的临时表中,递归查询关系时只在当前credit中查找,大量减少时间。
sqlstr = f"CREATE OR REPLACE TABLE temp_table{init_phone} AS SELECT * FROM datas_clean_clean WHERE credit IN (SELECT credit FROM datas_clean_clean WHERE list_contains(phones,'{init_phone}'));"
conn.sql(
sqlstr
)
# phone 对应的所有 id
conn.execute(f"select DISTINCT id from datas_clean_clean where list_contains(phones,'{init_phone}')")
current_ids = [i[0] for i in conn.fetchall()]
print('# 该phone共对应ID:',end='')
print(current_ids)
# 以这些id为基础,进行id所在树的还原
current_ids_path = []
for current_id in current_ids:
temp_path = find_related_ids(current_id,init_phone)
# current_ids_path = current_ids_path.append(temp_path)
# if current_id in
id_phone_dict = temp_path
if len(id_phone_dict)>1:
print(id_phone_dict)
# 将字典转换为dataframe
tmp_df = pd.DataFrame(list(id_phone_dict.items()), columns=['id', 'phone'])
# 添加rootid列
tmp_df['rootid'] = list(id_phone_dict.keys())[0]
# print('\n','->'.join(list(id_phone_dict.keys())),'\n')
# df = df.append(tmp_df, ignore_index=True)
df = pd.concat([tmp_df, df],ignore_index=True)
with duckdb.connect(r'E:\数据分析\zfrz.db') as conn:
phones = ['3116','9688','4892']
for phone in phones:
add_path(phone)
print(df)
#df 写入临时表
conn.sql("DROP TABLE IF EXISTS mapping;CREATE TEMP TABLE mapping AS SELECT * FROM df;")
# # 根据映射写入表中
conn.sql(f"UPDATE datas_clean_clean SET rootid=mapping.rootid FROM mapping WHERE datas_clean_clean.id =mapping.id")
conn.execute("SELECT credit, company, name ,phones, id, pid, rootid FROM datas_clean_clean WHERE rootid NOT NULL GROUP BY credit, company, name ,phones, id, pid, rootid;")
otherdf = conn.fetch_df()
otherdf.to_csv('tree.csv',index=False)
# neo4j导入节点
# LOAD CSV WITH HEADERS FROM 'file:///tree.csv' AS row
# CREATE (n:ZFRZ {
# credit: row.credit,
# company: row.company,
# name: row.name,
# phones: row.phones,
# id: row.id,
# pid: row.pid,
# rootid: row.rootid
# })
# neo4j 创建关系
# LOAD CSV WITH HEADERS FROM 'file:///tree.csv' AS row
# // Find the parent node (B) using the pid value
# MATCH (parent:ZFRZ {id: row.pid})
# // Find the child node (A) using the id value
# MATCH (child:ZFRZ {id: row.id})
# // Create the relationship from parent to child
# CREATE (parent)-[:SON]->(child)
|