Ok, for anyone watching, here’s the completed script.
My first attempt at Python, so be nice 
All comments/feedback/improvements welcome - I stress that this works on my setup, your results may vary.
Many thanks to Frantz and Jan for your help on this one.
#!/usr/local/bin/python3
# Code borrowed and slightly modified from
# https://alexmarquardt.com/2018/07/23/deduplicating-documents-in-elasticsearch/
## Many thanks to the original author
# Key differences:
## Keys used to compare hashes - Should be generic for any index populated via Filebeat/Logstash/Graylog etc - Use at your own risk!
## Added command line argument to pass in a number of indexes and loop through them one at a time
## Added "trivial" (took some figuring out for a non-python person) delete step with some help from the Graylog Community
## https://community.graylog.org/t/how-to-remove-duplicate-messages-from-an-index/11039
## Sets the index to writeable and back to read only once complete (may be specific to Graylog only)
## Recalculates the index range in Graylog
###########
########### Note that I am not a Python developer (or a real developer at all) so re-use this code at your own risk!
###########
import hashlib, sys, json, requests, datetime
from elasticsearch import Elasticsearch
es = Elasticsearch(["localhost:9200"])
dict_of_duplicate_docs = {}
# The following line defines the fields that will be used to determine if a document is a duplicate
keys_to_include_in_hash = ["@timestamp", "log_file_path", "log_id", "log_offset"]
# Process documents returned by the current search/scroll
def populate_dict_of_duplicate_docs(hits):
for item in hits:
combined_key = ""
for mykey in keys_to_include_in_hash:
combined_key += str(item['_source'][mykey])
_id = item["_id"]
hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
# Loop over all documents in the index, and populate the
# dict_of_duplicate_docs data structure.
def scroll_over_all_docs(index_name):
data = es.search(index=index_name, scroll='1m', body={"query": {"match_all": {}}})
# Get the scroll ID
sid = data['_scroll_id']
scroll_size = len(data['hits']['hits'])
# Before scroll, process current batch of hits
populate_dict_of_duplicate_docs(data['hits']['hits'])
while scroll_size > 0:
data = es.scroll(scroll_id=sid, scroll='2m')
# Process current batch of hits
populate_dict_of_duplicate_docs(data['hits']['hits'])
# Update the scroll ID
sid = data['_scroll_id']
# Get the number of results that returned in the last scroll
scroll_size = len(data['hits']['hits'])
def loop_over_hashes_and_remove_duplicates(index_name):
# Search through the hash of doc values to see if any
# duplicate hashes have been found
for hashval, array_of_ids in dict_of_duplicate_docs.items():
if len(array_of_ids) > 1:
# For testing, print the hash value
#print("********** Duplicate docs hash=%s **********" % hashval)
# Get the documents that have mapped to the current hasval
matching_docs = es.mget(index=index_name, doc_type="doc", body={"ids": array_of_ids})
#For testing, print the matching documents - Should list all ID's of documents that match this hash
#print(matching_docs)
#Ensure that only copies after the first document are deleted, leaving at least 1
total_duplicates = len(matching_docs['docs']) -1
doc_position = 1
while (total_duplicates >= doc_position):
doc = matching_docs['docs'][doc_position]
# Print the doc details - Probably only helpful in testing, otherwise just a waste of CPU
#print("doc to delete=%s\n" % doc)
# Delete the duplicate document
es.delete(index=doc['_index'], id=doc['_id'])
doc_position = doc_position + 1
# For testing, force the script to pause on each delete - You can check the results by querying for matching documents
#raw_input("Press Enter to continue...")
#Following function is specific to Graylog, this will re-calculate the index range
def recalc_index_ranges(api_host,api_token,index_name):
api_url = api_host + '/api/system/indices/ranges/' + index_name + '/rebuild'
headers = {'Content-Type': 'application/json', 'X-Requested-By': 'Elastic_DeDupe_Script'}
#Lazy coding - i'm ignoring SSL errors
response = requests.post(api_url, headers=headers, auth=(api_token,'token'), verify=False)
if response.status_code == 202:
print("Re-calculate index range request successful\n")
else:
print("Unknown error in attempting to re-calcuate index range\n")
def main(argv):
print("Time Started: %s" % datetime.datetime.now())
# Loop through for each index name supplied. The first argument is the script name itself, so skip over that one
# Call the script with arguments like "python yourscriptname.py index1 index2 index3"
total_indexes = len(argv) -1
argv_position = 1
while (total_indexes >= argv_position):
index_name = sys.argv[argv_position]
print("Starting on index: %s\n" % index_name)
# Set the index to writeable - Graylog sets them to read only once it rolls over to a new index
# CURL equivalent = curl -XPUT -H "Content-Type: application/json" http://localhost:9200/index_name/_settings -d '{"index.blocks.write": "false"}'
# Note this also effectively validates the index name passed in, as if the index doesn't exist it will throw an error.
## If you need to list your current indexes, you can use this command (without the curly brackets) {curl -X GET "localhost:9200/_cat/indices?v"}
put = es.indices.put_settings(index=index_name, body={"index.blocks.write": "false"})
if 'acknowledged' not in put or put['acknowledged'] != True:
raise Exception('Failed to set writeable index: %s\n\n%s' % (index, json.dumps(put)))
# Perform the de-duplication
scroll_over_all_docs(index_name)
loop_over_hashes_and_remove_duplicates(index_name)
# Set the index to back to read-only
put = es.indices.put_settings(index=index_name, body={"index.blocks.write": "true"})
if 'acknowledged' not in put or put['acknowledged'] != True:
raise Exception('Failed to set read-only index: %s\n\n%s' % (index, json.dumps(put)))
#Very specific to Graylog, recalculate the index ranges
## Reference: http://docs.graylog.org/en/3.0/pages/configuration/rest_api.html#interacting-with-the-graylog-rest-api
## Create an API token to use here (following the doc above), and use whatever hostname your Graylog instance responds to
api_host = 'https://my.server.fqdn:9000'
api_token = 'zzz_my_token_zzz'
recalc_index_ranges(api_host,api_token,index_name)
# Finally, increment the counter for the loop
argv_position = argv_position + 1
print("Time Completed: %s" % datetime.datetime.now())
# Execute script
main(sys.argv)
EDIT: Fixed a typo in the “recalc_index_ranges” function name