-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathfvecs_shard.py
More file actions
118 lines (100 loc) · 4.1 KB
/
fvecs_shard.py
File metadata and controls
118 lines (100 loc) · 4.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/env python3
import os
import struct
import argparse
import sys
# Usage:
# ./fvecs_shard.py mydata.fvecs 1000000 --output-dir /shards
# Produces:
# mydata_1000000_0.fvecs
# mydata_1000000_1.fvecs
# …
# mydata_1000000_{N-1}.fvecs
# mydata_{R}_{N}.fvecs # R < 1000000 is the final remainder
#
# Must specify absolute output path to write shards to (which will be created if it doesn't exist):
def fvecs_shard(input_path: str, shard_size: int, output_dir: str):
# ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# verify write permission
if not os.access(output_dir, os.W_OK):
raise PermissionError(f"Cannot write to output directory {output_dir!r}: permission denied")
# use only the filename for naming shards
filename = os.path.basename(input_path)
base, ext = os.path.splitext(filename)
# --- read dimension from header
with open(input_path, "rb") as f:
hdr = f.read(4)
if len(hdr) < 4:
raise ValueError("Input file is empty or too small.")
dim = struct.unpack("<i", hdr)[0]
record_size = 4 + dim * 4 # 4 bytes for int32 + 4 bytes per float
# --- compute total vectors
total_bytes = os.path.getsize(input_path)
if total_bytes % record_size != 0:
raise ValueError(
f"File size ({total_bytes}) is not a multiple of record size ({record_size})."
)
total_vectors = total_bytes // record_size
full_shards = total_vectors // shard_size
remainder = total_vectors % shard_size
print(f"Input has {total_vectors:,} vectors (dim={dim}).")
print(f"Creating {full_shards} full shards of {shard_size} vectors each", end="")
if remainder:
print(f", plus 1 final shard of {remainder} vectors.")
else:
print(".")
# --- stream through and write out each shard
with open(input_path, "rb") as fin:
for shard_id in range(full_shards):
out_name = f"{base}_{shard_size}_{shard_id}{ext}"
out_path = os.path.join(output_dir, out_name)
with open(out_path, "wb") as fout:
for _ in range(shard_size):
data = fin.read(record_size)
if len(data) != record_size:
raise EOFError("Unexpected EOF in a full shard.")
fout.write(data)
print(f" ▸ Wrote shard {shard_id}: {out_path}")
if remainder:
out_name = f"{base}_{remainder}_{full_shards}{ext}"
out_path = os.path.join(output_dir, out_name)
with open(out_path, "wb") as fout:
for _ in range(remainder):
data = fin.read(record_size)
if len(data) != record_size:
raise EOFError("Unexpected EOF in remainder shard.")
fout.write(data)
print(f" ▸ Wrote final shard {full_shards}: {out_path}")
def main():
parser = argparse.ArgumentParser(
description="Split a .fvecs file into shards of S vectors each."
)
parser.add_argument("input_file", help="path to the input .fvecs file")
parser.add_argument("shard_size", type=int, help="number of vectors per shard (S)")
parser.add_argument(
"--output-dir", "-o",
required=True,
help="absolute path to the directory to write shards into"
)
args = parser.parse_args()
# ensure shard_size is positive
if args.shard_size <= 0:
print("ERROR: shard_size must be a positive integer.", file=sys.stderr)
sys.exit(1)
# check input file
if not os.path.isfile(args.input_file):
print(f"ERROR: Input file {args.input_file!r} does not exist.", file=sys.stderr)
sys.exit(1)
# enforce absolute output directory
out_dir = args.output_dir
if not os.path.isabs(out_dir):
print(f"ERROR: --output-dir must be an absolute path, got {out_dir!r}.", file=sys.stderr)
sys.exit(1)
try:
fvecs_shard(args.input_file, args.shard_size, out_dir)
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()