-
Notifications
You must be signed in to change notification settings - Fork 0
/
run_pipeline
executable file
·113 lines (90 loc) · 2.51 KB
/
run_pipeline
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/env bash
set -e
# Required temporary storage
TMP_DIR=$(mktemp -d /tmp/dedup.XXXXXX)
cleanup() {
rm -rf "${TMP_DIR}"
}
trap cleanup EXIT SIGINT SIGTERM
# Help documentation
usage() {
echo -e "Usage: $0 [--debug] <input_glob> <output_directory>"
echo -e "This script runs the record linkage pipeline on your local host.\n"
echo -e "Params:"
echo -e " - <input_globs>\tcomma separated list of globs to deduplicate; first pipeline stage is parallelized across globs"
echo -e " - <output_dir>\tdirectory to save the deuplicated files to\n"
echo -e "Example: $0 'input_dir/[a-m]*.txt,input_dir/[n-z]*.txt' output_dir/\n"
exit 1
}
# Initialize debug mode variable
RUST_LOG="error"
# Parse command line arguments
while [[ "$#" > 0 ]]; do
case "$1" in
--debug)
RUST_LOG="info"
shift 1
;;
--help)
usage
exit 0
;;
*)
break
;;
esac
done
export RUST_LOG
# Check for correct number of arguments
if [ "$#" -ne 2 ]; then
echo "Error: You must enter exactly two parameters."
usage
fi
# Validate inputs
validate_glob() {
local pattern="$1"
local files=( $pattern )
# Check if the expansion returns anything
if [ ${#files[@]} -eq 0 ]; then
echo "No files match the glob '$pattern'."
echo "Please provide a valid glob pattern."
return 1 # Return 1 for error state
else
return 0 # Return 0 for success state
fi
}
validate_directory() {
if [ ! -d "$1" ]; then
echo "Error: '$1' is not a valid directory."
exit 2
fi
}
log() {
echo -e "\033[32m${1}\033[0m"
}
IFS=',' read -ra INPUT_GLOBS <<< "$1"
for i in "${INPUT_GLOBS[@]}"; do
validate_glob "$i"
done
OUTPUT_DIR="$2"
validate_directory "$OUTPUT_DIR"
# Start the pipeline
log "Hashing documents..."
for GLOB in "${INPUT_GLOBS[@]}"; do
GLOB="${GLOB/#\~/$HOME}"
GLOB="${GLOB/#\./$PWD}"
./target/release/hash_documents "$GLOB" "$TMP_DIR" &
done
wait
log "\nIntermediate hash files (first char of filename is the shard):"
ls "$TMP_DIR/"
SHARDS=($(ls "$TMP_DIR/" | sed 's/^\(.\).*/\1/' | sort -u))
log "\nDeduping files based on their hashes..."
for SHARD in "${SHARDS[@]}"; do
./target/release/dedup_hashes "$TMP_DIR/${SHARD}_*" "$OUTPUT_DIR/$SHARD.csv" &
done
wait
log "\nUniques (first char of the filename is the shard):"
ls $OUTPUT_DIR/*.csv
echo ""
log "Run the following command to get uniques: ls $OUTPUT_DIR/*.csv | xargs cat"