Python和Golang超大文本文件处理

在python中,我们可以使用yield关键字和for循环实现迭代读取超大文本文件,每次读取1000行。

例如,我们可以定义一个函数read_lines_by_1000(),每次yield 1000行的数据:

1
2
3
4
5
6
7
8
9
def read_lines_by_1000(filename):
    with open(filename, 'r') as f:
        lines = []
        for line in f:
            lines.append(line)
            if len(lines) == 1000:
                yield lines
                lines = []
        yield lines

然后,我们可以使用for循环迭代读取超大文本文件:

1
2
3
4
filename = 'data.txt'
for lines in read_lines_by_1000(filename):
    print(lines)
    # 处理lines中的数据

这样就可以在python中迭代读取超大文本文件,每次读取1000行了。 使用yield关键字和for循环迭代读取超大文本文件,每次读取1000行的方法已经是目前效率最高的方法之一。

如果想要提升效率,还可以考虑以下几种方法:

  1. 尽量使用内存映射文件读取超大文本文件,这样可以避免将数据从磁盘读取到内存中,从而提升读取速度。

  2. 尽量使用多线程读取超大文本文件,从而提升读取速度。

  3. 尽量使用多进程读取超大文本文件,从而提升读取速度。

  4. 尽量使用单个较大的缓冲区读取超大文本文件,从而提升读取速度。

需要注意的是,使用上述方法时可能会带来一些问题,例如内存占用过大,多线程/多进程同步问题等。因此,需要根据实际情况选择适合的方法。

在python中,可以使用以下几种方法正确处理超大文本文件,防止内存吃不消:

  1. 使用yield关键字和for循环迭代读取超大文本文件,每次读取一定数量的行。这样可以避免将整个文件一次性读取到内存中,从而防止内存吃不消。

  2. 使用内存映射文件读取超大文本文件。这样可以避免将数据从磁盘读取到内存中,从而防止内存吃不消。

  3. 使用多线程/多进程读取超大文本文件。这样可以分担读取超大文本文件的负担,从而防止内存吃不消。

  4. 使用单个较大的缓冲区读取超大文本文件。这样可以减少读取超大文本文件时的系统调用次数,从而提升读取速度,防止内存吃不消。

需要注意的是,使用上述方法时可能会带来一些问题,例如内存占用过大,多线程/多进程同步问题等。因此,需要根据实际情况选择适合的方法。

在python中,可以使用mmap模块来实现内存映射文件。

例如,我们可以使用mmap.mmap()函数将文件映射到内存中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import mmap

filename = 'data.txt'

# 使用mmap.mmap()函数将文件映射到内存中
with open(filename, 'r+b') as f:
    mm = mmap.mmap(f.fileno(), 0)

# 使用mm对象读取文件内容
print(mm.read())

# 使用mm对象修改文件内容
mm.seek(0)
mm.write(b'new data')

# 关闭mmap对象
mm.close()

这样就可以使用mmap模块实现内存映射文件了。

在python中,可以使用mmap模块来实现内存映射文件,然后使用for循环迭代读取超大文本文件。

例如,我们可以使用mmap.mmap()函数将文件映射到内存中,然后使用for循环迭代读取超大文本文件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import mmap

filename = 'data.txt'

# 使用mmap.mmap()函数将文件映射到内存中
with open(filename, 'r+b') as f:
    mm = mmap.mmap(f.fileno(), 0)

# 使用for循环迭代读取超大文本文件
for line in mm:
    print(line)

# 关闭mmap对象
mm.close()

这样就可以使用内存映射处理超大文件了。 另外,我们也可以使用mmap.mmap()函数将文件映射到内存中,然后使用yield关键字和for循环迭代读取超大文本文件,每次读取一定数量的行。例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import mmap

# 下面两个函数二选一
def read_lines_by_num(filename,line_num):
    with open(filename,'r+b') as f:
        with mmap.mmap(f.fileno(),0) as mm:
            lines=[]
            for line in iter(mm.readline,b""):
                lines.append(bytes.decode(line)) #使用bytes.decode将byte字符串转为普通字符串
                if len(lines)==line_num:
                    yield lines
                    lines=[]
            # 返回剩余不足line_num的行
            yield lines


def read_data(file_path,chunk_size):
    with open(file_path, "r+") as f:
        m = mmap.mmap(f.fileno(), 0)
        g_index = 0
        lines = []
        for index, char in enumerate(m):
            if char == b"\n":
                lines.append(m[g_index:index + 1].decode())
                g_index = index + 1
                if len(lines) == chunk_size:
                    yield lines
                    lines = []
        # 返回剩余不足chunk_size的行
        yield lines

filename=r'五亿数据.txt'
for lines in read_lines_by_num(filename,1000000):
    print(lines)
    # 处理lines中的数据

这样就可以使用内存映射处理超大文本文件,并使用yield关键字和for循环迭代读取超大文本文件,每次读取一定数量的行了。

在python中,可以使用mmap模块将文件映射到内存中,然后使用多进程和多线程同时读取超大文本文件。

具体实现方法如下:

首先,我们使用mmap.mmap()函数将文件映射到内存中:

1
2
3
4
5
6
7
import mmap

filename = 'data.txt'

# 使用mmap.mmap()函数将文件映射到内存中
with open(filename, 'r+b') as f:
    mm = mmap.mmap(f.fileno(), 0)

然后,我们使用多进程和多线程同时读取超大文本文件。在这里,我们使用multiprocessing模块的Queue类和threading模块的Queue类来解决同步问题:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import iprocessing
import iprocessing.queues
import threading
import queue

def read_lines(q, start, end):
    """读取超大文本文件的函数"""
    lines = []
    for i, line in enumerate(mm):
        if i < start:
            continue
        if i >= end:
            break
        lines.append(line)
    q.put(lines)

# 使用多进程和多线程同时读取超大文本文件
q = queue.Queue()
threads = []

for i in range(10):
    start = i * 1000
    end = (i + 1) * 1000
    t = threading.Thread(target=read_lines, args=(q, start, end))
    t.start()
    threads.append(t)

p = iprocessing.Process(target=read_lines, args=(q, 10000, 20000))
p.start()

for t in threads:
    t.join()

p.join()

while not q.empty():
    lines = q.get()
    print(lines)
    # 处理lines中的数据

# 关闭mmap对象
mm.close()

原文参见:

下面是ChatGPT
的详细注释:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import os
from mmap import mmap

def removeLine(filename, lineno):
    # 打开文件,并以读写模式打开
    # os.O_RDWR 表示以读写模式打开文件
    f = os.open(filename, os.O_RDWR)
    
    # 创建内存映射文件
    # mmap 函数的第一个参数是文件描述符,第二个参数是内存映射文件的大小。
    # 当第二个参数设置为 0 时,内存映射文件的大小会自动调整为文件的大小。
    m = mmap(f, 0)
    
    # 定位要删除的行的位置
    # p 是要删除的行的开始位置,q 是要删除的行的结束位置。
    p = 0
    for i in range(lineno-1):
        # 使用 m.find 函数定位下一行的开始位置
        # 当 p 为 0 时,表示在内存映射文件的开头查找。
        # 当 p 不为 0 时,表示在 p 之后查找。
        p = m.find('\n', p) + 1
    # 定位要删除的行的结束位置
    q = m.find('\n', p)
    
    # 删除行
    # 使用 m[p:q] 获取要删除的行的内容,然后将其赋值为空字符串。
    m[p:q] = ' ' * (q - p)
    
    # 关闭文件
    os.close(f)

的注释:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# You can have two file objects for the same file at the same time (one for reading, one for writing):
# 你可以在同一时间内拥有同一文件的两个文件对象(一个用于读取,一个用于写入)。
def removeLine(filename, lineno):
    # 以只读模式打开文件
    fro = open(filename, "rb")

    # 读取文件的行数
    current_line = 0
    # 循环,直到读取的行数等于要删除的行数
    while current_line < lineno:
        # 使用 fro.readline() 读取一行
        fro.readline()
        # 读取的行数加 1
        current_line += 1

    # 记录当前文件指针的位置
    seekpoint = fro.tell()

    # 以读写模式打开文件
    frw = open(filename, "r+b")
    # 移动文件指针到 seekpoint 处
    frw.seek(seekpoint, 0)

    # 读取要删除的行
    fro.readline()

    # 将文件中剩余的内容向前移动一行
    # 使用 fro.readline() 读取一行
    chars = fro.readline()
    # 循环,直到读取到文件末尾
    while chars:
        # 使用 frw.writelines() 将读取到的内容写入文件
        frw.writelines(chars)
        # 继续读取下一行
        chars = fro.readline()

    # 关闭文件
    fro.close()
    frw.truncate()
    frw.close()

在golang中,可以使用bufio模块的Scanner类来优雅的处理超大文本文件。

具体实现方法如下:

首先,我们使用os.Open()函数打开文本文件:

1
2
3
4
5
f, err := os.Open("data.txt")
if err != nil {
    fmt.Println(err)
    return
}

然后,我们使用bufio.NewScanner()函数创建Scanner对象:

1
scanner := bufio.NewScanner(f)

接下来,我们使用Scanner.Scan()函数读取文本文件的每一行数据:

1
2
3
4
5
for scanner.Scan() {
    line := scanner.Text()
    fmt.Println(line)
    // 处理line中的数据
}

最后,我们使用Scanner.Err()函数检查是否出现错误:

1
2
3
if err := scanner.Err(); err != nil {
    fmt.Println(err)
}

通过使用Scanner类,我们可以优雅地处理超大文本文件。

此外,我们也可以使用多协程来加速文本文件的读取。在这里,我们可以使用channel来解决协程间同步问题:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import (
    "bufio"
    "fmt"
    "os"
)

func readLines(ch chan string, start int, end int) {
    f, err := os.Open("data.txt")
    if err != nil {
        fmt.Println(err)
        return
    }
    defer f.Close()

    scanner := bufio.NewScanner(f)
    for i := 0; scanner.Scan(); i++ {
        if i < start {
            continue
        }
        if i >= end {
            break
        }
        line := scanner.Text()
        ch <- line
    }
    close(ch)
}

func main() {
    chs := make([]chan string, 10)
    for i := range chs {
        chs[i] = make(chan string)
        go readLines(chs[i], i*1000, (i+1)*1000)
    }

    for _, ch := range chs {
        for line := range ch {
            fmt.Println(line)
            // 处理line中的数据
        }
    }
}

上述代码中,我们使用了channel来解决协程间同步问题。每个协程读取超大文本文件的一部分数据,然后将读取的数据存储到channel中。最后,主协程从channel中取出数据,处理完后关闭channel。

对于每次读取一行的效率问题,我们可以使用bufio.NewReader()函数创建一个bufio.Reader对象,然后使用bufio.Reader.ReadString()函数读取指定行数的数据。

具体实现方法如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import (
    "bufio"
    "fmt"
    "os"
)

func readLines(start int, end int) (lines []string, err error) {
    f, err := os.Open("data.txt")
    if err != nil {
        return
    }
    defer f.Close()

    reader := bufio.NewReader(f)
    for i := 0; i < end; i++ {
        line, err := reader.ReadString('\n')
        if err != nil {
            return lines, err
        }
        if i < start {
            continue
        }
        lines = append(lines, line)
    }
    return
}

func main() {
    lines, err := readLines(0, 1000)
    if err != nil {
        fmt.Println(err)
        return
    }
    for _, line := range lines {
        fmt.Println(line)
        // 处理line中的数据
    }
}

上述代码中,我们使用bufio.Reader.ReadString()函数读取指定行数的数据,并将读取的数据存储到lines列表中。

此外,我们还可以使用多协程加速读取文本文件的速度。在这里,我们可以使用channel来解决协程间同步问题:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import (
    "bufio"
    "fmt"
    "os"
)

func readLines(start int, end int) (lines []string, err error) {
    f, err := os.Open("data.txt")
    if err != nil {
        return
    }
    defer f.Close()

    reader := bufio.NewReader(f)
    for i := 0; i < end; i++ {
        line, err := reader.ReadString('\n')
        if err != nil {
            return lines, err
        }
        if i < start {
            continue
        }
        lines = append(lines, line)
    }
    return
}

func readLinesFromChannel(ch chan []string, start int, end int) {
    lines, err := readLines(start, end)
    if err != nil {
        ch <- nil
        return
    }
    ch <- lines
}

func main() {
    chs := make([]chan []string, 10)
    for i := range chs {
        chs[i] = make(chan []string)
        go readLinesFromChannel(chs[i], i*1000, (i+1)*1000)
    }

    for _, ch := range chs {
        lines := <-ch
        if lines == nil {
            continue
        }
        for _, line := range lines {
            fmt.Println(line)
            // 处理line中的数据
        }
    }
}

通过使用bufio.Reader.ReadString()函数一次读取指定行数的数据,以及使用多协程加读取文本文件的速度,我们可以大幅提升读取超大文本文件的效率。 希望这次的代码能够满足您的需求。如果您有任何疑问,请随时联系我。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
package main

import (
	"bufio"
	"io"
	"log"
	"os"
	"strings"
	"syscall"
)

const (
	// chunkSize 是每次读取的行数
	chunkSize = 1000
	// 分隔符
	delimiter = "---"
)

func main() {
	// 打开文件
	f, err := os.Open("large.txt")
	if err != nil {
		log.Fatal(err)
	}
	defer f.Close()

	// 获取文件信息
	fi, err := f.Stat()
	if err != nil {
		log.Fatal(err)
	}

	// 内存映射文件
	size := fi.Size()
	prot := syscall.PROT_READ
	flags := syscall.MAP_PRIVATE
	fd := int(f.Fd())
	offset := 0
	b, err := syscall.Mmap(fd, int64(offset), int(size), prot, flags)
	if err != nil {
		log.Fatal(err)
	}
	defer syscall.Munmap(b)

	// 创建三个文件用于存储结果
	lessThanSeven, err := os.Create("less_than_seven.txt")
	if err != nil {
		log.Fatal(err)
	}
	defer lessThanSeven.Close()
	equalSeven, err := os.Create("equal_seven.txt")
	if err != nil {
		log.Fatal(err)
	}
	defer equalSeven.Close()
	greaterThanSeven, err := os.Create("greater_than_seven.txt")
	if err != nil {
		log.Fatal(err)
	}
	defer greaterThanSeven.Close()

	// 使用bufio包的NewReader函数创建一个带缓冲的读取器
	r := bufio.NewReader(f)

	// 读取文件
	for {
		// 读取chunkSize行
		chunk := make([]string, 0, chunkSize)
		for i := 0; i < chunkSize; i++ {
			line, err := r.ReadString('\n')
			if err == io.EOF {
				// 文件结尾
				break
			} else if err != nil {
				log.Fatal(err)
			}
			chunk = append(chunk, line)
		}

		// 处理读取的行
		for _, line := range chunk {
			columns := strings.Split(line, delimiter)
			numColumns := len(columns)
			if numColumns < 7 {
				// 列数小于7
				_, err := lessThanSeven.WriteString(line)
				if err != nil {
					log.Fatal(err)
				}
			} else if numColumns == 7 {
				// 列数等于7
				_, err := equalSeven.WriteString(line)
				if err != nil {
					log.Fatal(err)
				}
			} else {
				// 列数大于7
				_, err := greaterThanSeven.WriteString(line)
				if err != nil {
					log.Fatal(err)
				}
			}
		}

		// 如果读取的行数不足chunkSize,说明文件已经读完
		if len(chunk) < chunkSize {
			break
		}
	}

}