在python中,我们可以使用yield关键字和for循环实现迭代读取超大文本文件,每次读取1000行。
例如,我们可以定义一个函数read_lines_by_1000(),每次yield 1000行的数据:
1
2
3
4
5
6
7
8
9
def read_lines_by_1000 ( filename ):
with open ( filename , 'r' ) as f :
lines = []
for line in f :
lines . append ( line )
if len ( lines ) == 1000 :
yield lines
lines = []
yield lines
然后,我们可以使用for循环迭代读取超大文本文件:
1
2
3
4
filename = 'data.txt'
for lines in read_lines_by_1000 ( filename ):
print ( lines )
# 处理lines中的数据
这样就可以在python中迭代读取超大文本文件,每次读取1000行了。
使用yield关键字和for循环迭代读取超大文本文件,每次读取1000行的方法已经是目前效率最高的方法之一。
如果想要提升效率,还可以考虑以下几种方法:
尽量使用内存映射文件读取超大文本文件,这样可以避免将数据从磁盘读取到内存中,从而提升读取速度。
尽量使用多线程读取超大文本文件,从而提升读取速度。
尽量使用多进程读取超大文本文件,从而提升读取速度。
尽量使用单个较大的缓冲区读取超大文本文件,从而提升读取速度。
需要注意的是,使用上述方法时可能会带来一些问题,例如内存占用过大,多线程/多进程同步问题等。因此,需要根据实际情况选择适合的方法。
在python中,可以使用以下几种方法正确处理超大文本文件,防止内存吃不消:
使用yield关键字和for循环迭代读取超大文本文件,每次读取一定数量的行。这样可以避免将整个文件一次性读取到内存中,从而防止内存吃不消。
使用内存映射文件读取超大文本文件。这样可以避免将数据从磁盘读取到内存中,从而防止内存吃不消。
使用多线程/多进程读取超大文本文件。这样可以分担读取超大文本文件的负担,从而防止内存吃不消。
使用单个较大的缓冲区读取超大文本文件。这样可以减少读取超大文本文件时的系统调用次数,从而提升读取速度,防止内存吃不消。
需要注意的是,使用上述方法时可能会带来一些问题,例如内存占用过大,多线程/多进程同步问题等。因此,需要根据实际情况选择适合的方法。
在python中,可以使用mmap模块来实现内存映射文件。
例如,我们可以使用mmap.mmap()函数将文件映射到内存中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import mmap
filename = 'data.txt'
# 使用mmap.mmap()函数将文件映射到内存中
with open ( filename , 'r+b' ) as f :
mm = mmap . mmap ( f . fileno (), 0 )
# 使用mm对象读取文件内容
print ( mm . read ())
# 使用mm对象修改文件内容
mm . seek ( 0 )
mm . write ( b 'new data' )
# 关闭mmap对象
mm . close ()
这样就可以使用mmap模块实现内存映射文件了。
在python中,可以使用mmap模块来实现内存映射文件,然后使用for循环迭代读取超大文本文件。
例如,我们可以使用mmap.mmap()函数将文件映射到内存中,然后使用for循环迭代读取超大文本文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import mmap
filename = 'data.txt'
# 使用mmap.mmap()函数将文件映射到内存中
with open ( filename , 'r+b' ) as f :
mm = mmap . mmap ( f . fileno (), 0 )
# 使用for循环迭代读取超大文本文件
for line in mm :
print ( line )
# 关闭mmap对象
mm . close ()
这样就可以使用内存映射处理超大文件了。
另外,我们也可以使用mmap.mmap()函数将文件映射到内存中,然后使用yield关键字和for循环迭代读取超大文本文件,每次读取一定数量的行。例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import mmap
# 下面两个函数二选一
def read_lines_by_num ( filename , line_num ):
with open ( filename , 'r+b' ) as f :
with mmap . mmap ( f . fileno (), 0 ) as mm :
lines = []
for line in iter ( mm . readline , b "" ):
lines . append ( bytes . decode ( line )) #使用bytes.decode将byte字符串转为普通字符串
if len ( lines ) == line_num :
yield lines
lines = []
# 返回剩余不足line_num的行
yield lines
def read_data ( file_path , chunk_size ):
with open ( file_path , "r+" ) as f :
m = mmap . mmap ( f . fileno (), 0 )
g_index = 0
lines = []
for index , char in enumerate ( m ):
if char == b " \n " :
lines . append ( m [ g_index : index + 1 ] . decode ())
g_index = index + 1
if len ( lines ) == chunk_size :
yield lines
lines = []
# 返回剩余不足chunk_size的行
yield lines
filename = r '五亿数据.txt'
for lines in read_lines_by_num ( filename , 1000000 ):
print ( lines )
# 处理lines中的数据
这样就可以使用内存映射处理超大文本文件,并使用yield关键字和for循环迭代读取超大文本文件,每次读取一定数量的行了。
在python中,可以使用mmap模块将文件映射到内存中,然后使用多进程和多线程同时读取超大文本文件。
具体实现方法如下:
首先,我们使用mmap.mmap()函数将文件映射到内存中:
1
2
3
4
5
6
7
import mmap
filename = 'data.txt'
# 使用mmap.mmap()函数将文件映射到内存中
with open ( filename , 'r+b' ) as f :
mm = mmap . mmap ( f . fileno (), 0 )
然后,我们使用多进程和多线程同时读取超大文本文件。在这里,我们使用multiprocessing模块的Queue类和threading模块的Queue类来解决同步问题:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import iprocessing
import iprocessing.queues
import threading
import queue
def read_lines ( q , start , end ):
"""读取超大文本文件的函数"""
lines = []
for i , line in enumerate ( mm ):
if i < start :
continue
if i >= end :
break
lines . append ( line )
q . put ( lines )
# 使用多进程和多线程同时读取超大文本文件
q = queue . Queue ()
threads = []
for i in range ( 10 ):
start = i * 1000
end = ( i + 1 ) * 1000
t = threading . Thread ( target = read_lines , args = ( q , start , end ))
t . start ()
threads . append ( t )
p = iprocessing . Process ( target = read_lines , args = ( q , 10000 , 20000 ))
p . start ()
for t in threads :
t . join ()
p . join ()
while not q . empty ():
lines = q . get ()
print ( lines )
# 处理lines中的数据
# 关闭mmap对象
mm . close ()
原文参见:
下面是ChatGPT对 的详细注释:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import os
from mmap import mmap
def removeLine ( filename , lineno ):
# 打开文件,并以读写模式打开
# os.O_RDWR 表示以读写模式打开文件
f = os . open ( filename , os . O_RDWR )
# 创建内存映射文件
# mmap 函数的第一个参数是文件描述符,第二个参数是内存映射文件的大小。
# 当第二个参数设置为 0 时,内存映射文件的大小会自动调整为文件的大小。
m = mmap ( f , 0 )
# 定位要删除的行的位置
# p 是要删除的行的开始位置,q 是要删除的行的结束位置。
p = 0
for i in range ( lineno - 1 ):
# 使用 m.find 函数定位下一行的开始位置
# 当 p 为 0 时,表示在内存映射文件的开头查找。
# 当 p 不为 0 时,表示在 p 之后查找。
p = m . find ( ' \n ' , p ) + 1
# 定位要删除的行的结束位置
q = m . find ( ' \n ' , p )
# 删除行
# 使用 m[p:q] 获取要删除的行的内容,然后将其赋值为空字符串。
m [ p : q ] = ' ' * ( q - p )
# 关闭文件
os . close ( f )
的注释:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# You can have two file objects for the same file at the same time (one for reading, one for writing):
# 你可以在同一时间内拥有同一文件的两个文件对象(一个用于读取,一个用于写入)。
def removeLine ( filename , lineno ):
# 以只读模式打开文件
fro = open ( filename , "rb" )
# 读取文件的行数
current_line = 0
# 循环,直到读取的行数等于要删除的行数
while current_line < lineno :
# 使用 fro.readline() 读取一行
fro . readline ()
# 读取的行数加 1
current_line += 1
# 记录当前文件指针的位置
seekpoint = fro . tell ()
# 以读写模式打开文件
frw = open ( filename , "r+b" )
# 移动文件指针到 seekpoint 处
frw . seek ( seekpoint , 0 )
# 读取要删除的行
fro . readline ()
# 将文件中剩余的内容向前移动一行
# 使用 fro.readline() 读取一行
chars = fro . readline ()
# 循环,直到读取到文件末尾
while chars :
# 使用 frw.writelines() 将读取到的内容写入文件
frw . writelines ( chars )
# 继续读取下一行
chars = fro . readline ()
# 关闭文件
fro . close ()
frw . truncate ()
frw . close ()
在golang中,可以使用bufio模块的Scanner类来优雅的处理超大文本文件。
具体实现方法如下:
首先,我们使用os.Open()函数打开文本文件:
1
2
3
4
5
f , err := os . Open ( "data.txt" )
if err != nil {
fmt . Println ( err )
return
}
然后,我们使用bufio.NewScanner()函数创建Scanner对象:
1
scanner := bufio . NewScanner ( f )
接下来,我们使用Scanner.Scan()函数读取文本文件的每一行数据:
1
2
3
4
5
for scanner . Scan () {
line := scanner . Text ()
fmt . Println ( line )
// 处理line中的数据
}
最后,我们使用Scanner.Err()函数检查是否出现错误:
1
2
3
if err := scanner . Err (); err != nil {
fmt . Println ( err )
}
通过使用Scanner类,我们可以优雅地处理超大文本文件。
此外,我们也可以使用多协程来加速文本文件的读取。在这里,我们可以使用channel来解决协程间同步问题:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import (
"bufio"
"fmt"
"os"
)
func readLines ( ch chan string , start int , end int ) {
f , err := os . Open ( "data.txt" )
if err != nil {
fmt . Println ( err )
return
}
defer f . Close ()
scanner := bufio . NewScanner ( f )
for i := 0 ; scanner . Scan (); i ++ {
if i < start {
continue
}
if i >= end {
break
}
line := scanner . Text ()
ch <- line
}
close ( ch )
}
func main () {
chs := make ([] chan string , 10 )
for i := range chs {
chs [ i ] = make ( chan string )
go readLines ( chs [ i ], i * 1000 , ( i + 1 ) * 1000 )
}
for _ , ch := range chs {
for line := range ch {
fmt . Println ( line )
// 处理line中的数据
}
}
}
上述代码中,我们使用了channel来解决协程间同步问题。每个协程读取超大文本文件的一部分数据,然后将读取的数据存储到channel中。最后,主协程从channel中取出数据,处理完后关闭channel。
对于每次读取一行的效率问题,我们可以使用bufio.NewReader()函数创建一个bufio.Reader对象,然后使用bufio.Reader.ReadString()函数读取指定行数的数据。
具体实现方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import (
"bufio"
"fmt"
"os"
)
func readLines ( start int , end int ) ( lines [] string , err error ) {
f , err := os . Open ( "data.txt" )
if err != nil {
return
}
defer f . Close ()
reader := bufio . NewReader ( f )
for i := 0 ; i < end ; i ++ {
line , err := reader . ReadString ( '\n' )
if err != nil {
return lines , err
}
if i < start {
continue
}
lines = append ( lines , line )
}
return
}
func main () {
lines , err := readLines ( 0 , 1000 )
if err != nil {
fmt . Println ( err )
return
}
for _ , line := range lines {
fmt . Println ( line )
// 处理line中的数据
}
}
上述代码中,我们使用bufio.Reader.ReadString()函数读取指定行数的数据,并将读取的数据存储到lines列表中。
此外,我们还可以使用多协程加速读取文本文件的速度。在这里,我们可以使用channel来解决协程间同步问题:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import (
"bufio"
"fmt"
"os"
)
func readLines ( start int , end int ) ( lines [] string , err error ) {
f , err := os . Open ( "data.txt" )
if err != nil {
return
}
defer f . Close ()
reader := bufio . NewReader ( f )
for i := 0 ; i < end ; i ++ {
line , err := reader . ReadString ( '\n' )
if err != nil {
return lines , err
}
if i < start {
continue
}
lines = append ( lines , line )
}
return
}
func readLinesFromChannel ( ch chan [] string , start int , end int ) {
lines , err := readLines ( start , end )
if err != nil {
ch <- nil
return
}
ch <- lines
}
func main () {
chs := make ([] chan [] string , 10 )
for i := range chs {
chs [ i ] = make ( chan [] string )
go readLinesFromChannel ( chs [ i ], i * 1000 , ( i + 1 ) * 1000 )
}
for _ , ch := range chs {
lines := <- ch
if lines == nil {
continue
}
for _ , line := range lines {
fmt . Println ( line )
// 处理line中的数据
}
}
}
通过使用bufio.Reader.ReadString()函数一次读取指定行数的数据,以及使用多协程加读取文本文件的速度,我们可以大幅提升读取超大文本文件的效率。
希望这次的代码能够满足您的需求。如果您有任何疑问,请随时联系我。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package main
import (
"bufio"
"io"
"log"
"os"
"strings"
"syscall"
)
const (
// chunkSize 是每次读取的行数
chunkSize = 1000
// 分隔符
delimiter = "---"
)
func main () {
// 打开文件
f , err := os . Open ( "large.txt" )
if err != nil {
log . Fatal ( err )
}
defer f . Close ()
// 获取文件信息
fi , err := f . Stat ()
if err != nil {
log . Fatal ( err )
}
// 内存映射文件
size := fi . Size ()
prot := syscall . PROT_READ
flags := syscall . MAP_PRIVATE
fd := int ( f . Fd ())
offset := 0
b , err := syscall . Mmap ( fd , int64 ( offset ), int ( size ), prot , flags )
if err != nil {
log . Fatal ( err )
}
defer syscall . Munmap ( b )
// 创建三个文件用于存储结果
lessThanSeven , err := os . Create ( "less_than_seven.txt" )
if err != nil {
log . Fatal ( err )
}
defer lessThanSeven . Close ()
equalSeven , err := os . Create ( "equal_seven.txt" )
if err != nil {
log . Fatal ( err )
}
defer equalSeven . Close ()
greaterThanSeven , err := os . Create ( "greater_than_seven.txt" )
if err != nil {
log . Fatal ( err )
}
defer greaterThanSeven . Close ()
// 使用bufio包的NewReader函数创建一个带缓冲的读取器
r := bufio . NewReader ( f )
// 读取文件
for {
// 读取chunkSize行
chunk := make ([] string , 0 , chunkSize )
for i := 0 ; i < chunkSize ; i ++ {
line , err := r . ReadString ( '\n' )
if err == io . EOF {
// 文件结尾
break
} else if err != nil {
log . Fatal ( err )
}
chunk = append ( chunk , line )
}
// 处理读取的行
for _ , line := range chunk {
columns := strings . Split ( line , delimiter )
numColumns := len ( columns )
if numColumns < 7 {
// 列数小于7
_ , err := lessThanSeven . WriteString ( line )
if err != nil {
log . Fatal ( err )
}
} else if numColumns == 7 {
// 列数等于7
_ , err := equalSeven . WriteString ( line )
if err != nil {
log . Fatal ( err )
}
} else {
// 列数大于7
_ , err := greaterThanSeven . WriteString ( line )
if err != nil {
log . Fatal ( err )
}
}
}
// 如果读取的行数不足chunkSize,说明文件已经读完
if len ( chunk ) < chunkSize {
break
}
}
}