How to remove duplicate messages from an index?

Hi all,

I’ve been searching everywhere, but can’t find an answer to my issue, so I thought i’d reach out here and see if anyone could help.

I have duplicate messages in an index - I’m not worried about how they got there (a Filebeat+SMB share issue), i’m more worried about “how do I clean it up?”.

I’ve found lots of comments (here and other forums, including elasticsearch) all pointing to How to Find and Remove Duplicate Documents in Elasticsearch | Elastic Blog as the answer.

Reading that page, I have to go the Python angle (no Logstash here), which is fine - I install Python, install the elasticsearch module and the script on that page works fine - I can SEE duplicates.

My issue is with this comment in the article:

I’m not a Python guy (first time i’ve looked at Python code to be honest) and i’ve tried a multitude of ways that I think this might work, but it turns out the blog writers idea of “straightforward” and mine differ greatly…

The best i’ve managed to achieve so far is the following error message

elasticsearch.exceptions.RequestError: RequestError(400, u'illegal_argument_exception', u'Rejecting mapping update to [<index_name>] as the final mapping would have more than 1 type: [doc, message]')

which I got by adding the following code to the “for doc in matching_docs” loop

es.delete(index="<index_name>", doc_type="doc", id=doc)

Any smart individuals out there familiar with Python & elasticsearch care to give me a hand with this?

Also, it would be awesome if this could be a UI feature in Graylog :slight_smile:

The line you added seems wrong. Provide an example of the print output and I think I can help you.
It will be something like:

es.delete(index=doc[‘_index’], doc_type=doc[‘_type’], id=doc[‘_id’])

Thanks Frantz,

I tried your line (replacing the quotes with valid characters) but I still get the same error.

Here’s the output from running the script (server & index names modified to protect the innocent, otherwise untouched)

[me@myserver ~]# python dedupe_elastic_index.py
********** Duplicate docs hash=▒ViQ5▒
▒:▒▒sV= **********
doc={u'found': False, u'_type': u'doc', u'_id': u'baac5488-9d19-11e9-94af-005056bc4fb7', u'_index': u'index_name'}

Traceback (most recent call last):
  File "dedupe_elastic_index.py", line 93, in <module>
    main()
  File "dedupe_elastic_index.py", line 90, in main
    loop_over_hashes_and_remove_duplicates()
  File "dedupe_elastic_index.py", line 77, in loop_over_hashes_and_remove_duplicates
    es.delete(index=doc['_index'], doc_type=doc['_type'], id=doc['_id'])
  File "/usr/lib/python2.7/site-packages/elasticsearch/client/utils.py", line 84, in _wrapped
    return func(*args, params=params, **kwargs)
  File "/usr/lib/python2.7/site-packages/elasticsearch/client/__init__.py", line 1384, in delete
    "DELETE", _make_path(index, doc_type, id), params=params
  File "/usr/lib/python2.7/site-packages/elasticsearch/transport.py", line 353, in perform_request
    timeout=timeout,
  File "/usr/lib/python2.7/site-packages/elasticsearch/connection/http_urllib3.py", line 251, in perform_request
    self._raise_error(response.status, raw_data)
  File "/usr/lib/python2.7/site-packages/elasticsearch/connection/base.py", line 178, in _raise_error
    status_code, error_message, additional_info
elasticsearch.exceptions.RequestError: RequestError(400, u'illegal_argument_exception', u'Rejecting mapping update to [index_name] as the final mapping would have more than 1 type: [doc, message]')

Here’s the for loop that is running the delete - Note that the “press enter” is just for my testing so that it doesn’t run forever

        for doc in matching_docs['docs']:
            # In order to remove the possibility of hash collisions,
            # write code here to check all fields in the docs to
            # see if they are truly identical - if so, then execute a
            # DELETE operation on all except one.
            # In this example, we just print the docs.
			print("doc=%s\n" % doc)
			#es.delete(index="index_name", doc_type="doc", id=doc)
			es.delete(index=doc['_index'], doc_type=doc['_type'], id=doc['_id'])
			raw_input("Press Enter to continue...")

Any help would be greatly appreciated! :smile:

Try deleting without indicate the type:

es.delete(index=doc['_index'], id=doc['_id'])

You can also try a simple curl command:

curl -X DELETE "localhost:9200/index_name/_doc/baac5488-9d19-11e9-94af-005056bc4fb7"

I couldn’t help ya with the script - but only that you know. After you have deleted messages in the elasticsearch index, you need to recalculate the index range on the indices via the Graylog API.

Many thanks Frantz, that works :smiley:

Script is running and I can see the “deleted messages” increasing and total messages decreasing. Seems to update every second or so.

I’m just letting the script run for now & testing, i’ll post it up here once complete. One thing i’m not sure on is the delete - I suspect it may actually delete all of the items, rather than “all except the first one” - I guess i’ll find out soon enough…

EDIT - Confirmed, it goes and deletes all of the messages that match the hash, i’ll have to work out some logic to keep one of those.

Thanks Jan,

While i’m building the script I may as well add that in - Do you know offhand the API command (or ideally some Python code) to do the recalc?

EDIT: I found this by using the API browser, it works fine in the browser, but I can’t seem to get a command line to work

curl -X POST "https://server.fqdn:9000/api/system/indices/ranges/index_name/rebuild"

I don’t get any output on the command line, and the last update time on the index doesn’t update either

You can either rebuild the range for all indices or for just one index.

You need to authenticate or use api tokens.

http://docs.graylog.org/en/3.0/pages/configuration/rest_api.html#interacting-with-the-graylog-rest-api

Many thanks! :smiley:

For reference for the other newbies like me - Follow the guide Jan linked to for creating the token, and specifically for this use case (re-calculate index range on a specific index):

curl -u zzztokenidzzz:token -H 'Accept: application/json' -H 'X-Requested-By: Elastic_DeDupe_Script' -X POST "https://server.fqdn:9000/api/system/indices/ranges/index_name/rebuild"

I’ve got my script running great, i’ll integrate this piece and then post the complete thing here.

Ok, for anyone watching, here’s the completed script.

My first attempt at Python, so be nice :smiley_cat:

All comments/feedback/improvements welcome - I stress that this works on my setup, your results may vary.

Many thanks to Frantz and Jan for your help on this one.

#!/usr/local/bin/python3

# Code borrowed and slightly modified from 
# https://alexmarquardt.com/2018/07/23/deduplicating-documents-in-elasticsearch/
## Many thanks to the original author
# Key differences:
## Keys used to compare hashes - Should be generic for any index populated via Filebeat/Logstash/Graylog etc - Use at your own risk!
## Added command line argument to pass in a number of indexes and loop through them one at a time
## Added "trivial" (took some figuring out for a non-python person) delete step with some help from the Graylog Community
## https://community.graylog.org/t/how-to-remove-duplicate-messages-from-an-index/11039
## Sets the index to writeable and back to read only once complete (may be specific to Graylog only)
## Recalculates the index range in Graylog

###########
########### Note that I am not a Python developer (or a real developer at all) so re-use this code at your own risk!
###########

import hashlib, sys, json, requests, datetime
from elasticsearch import Elasticsearch
es = Elasticsearch(["localhost:9200"])
dict_of_duplicate_docs = {}

# The following line defines the fields that will be used to determine if a document is a duplicate
keys_to_include_in_hash = ["@timestamp", "log_file_path", "log_id", "log_offset"]

# Process documents returned by the current search/scroll
def populate_dict_of_duplicate_docs(hits):
    for item in hits:
        combined_key = ""
        for mykey in keys_to_include_in_hash:
            combined_key += str(item['_source'][mykey])
        _id = item["_id"]
        hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
        dict_of_duplicate_docs.setdefault(hashval, []).append(_id)

# Loop over all documents in the index, and populate the
# dict_of_duplicate_docs data structure.
def scroll_over_all_docs(index_name):
    data = es.search(index=index_name, scroll='1m',  body={"query": {"match_all": {}}})
    # Get the scroll ID
    sid = data['_scroll_id']
    scroll_size = len(data['hits']['hits'])
    # Before scroll, process current batch of hits
    populate_dict_of_duplicate_docs(data['hits']['hits'])
    while scroll_size > 0:
        data = es.scroll(scroll_id=sid, scroll='2m')
        # Process current batch of hits
        populate_dict_of_duplicate_docs(data['hits']['hits'])
        # Update the scroll ID
        sid = data['_scroll_id']
        # Get the number of results that returned in the last scroll
        scroll_size = len(data['hits']['hits'])

def loop_over_hashes_and_remove_duplicates(index_name):
    # Search through the hash of doc values to see if any
    # duplicate hashes have been found
    for hashval, array_of_ids in dict_of_duplicate_docs.items():
      if len(array_of_ids) > 1:
		# For testing, print the hash value
		#print("********** Duplicate docs hash=%s **********" % hashval)
		
		# Get the documents that have mapped to the current hasval
		matching_docs = es.mget(index=index_name, doc_type="doc", body={"ids": array_of_ids})
		
		#For testing, print the matching documents - Should list all ID's of documents that match this hash
		#print(matching_docs)

		#Ensure that only copies after the first document are deleted, leaving at least 1
		total_duplicates = len(matching_docs['docs']) -1
		doc_position = 1
		while (total_duplicates >= doc_position):
			doc = matching_docs['docs'][doc_position]
			# Print the doc details - Probably only helpful in testing, otherwise just a waste of CPU
			#print("doc to delete=%s\n" % doc)

			# Delete the duplicate document
			es.delete(index=doc['_index'], id=doc['_id'])
			doc_position = doc_position + 1
			
			# For testing, force the script to pause on each delete - You can check the results by querying for matching documents
			#raw_input("Press Enter to continue...")

#Following function is specific to Graylog, this will re-calculate the index range
def recalc_index_ranges(api_host,api_token,index_name):
	api_url = api_host + '/api/system/indices/ranges/' + index_name + '/rebuild'
	headers = {'Content-Type': 'application/json', 'X-Requested-By': 'Elastic_DeDupe_Script'}

	#Lazy coding - i'm ignoring SSL errors
	response = requests.post(api_url, headers=headers, auth=(api_token,'token'), verify=False)

	if response.status_code == 202:
		print("Re-calculate index range request successful\n")
	else:
		print("Unknown error in attempting to re-calcuate index range\n")
			
def main(argv):
	print("Time Started: %s" % datetime.datetime.now())		

	# Loop through for each index name supplied. The first argument is the script name itself, so skip over that one
	# Call the script with arguments like "python yourscriptname.py index1 index2 index3" 
	total_indexes = len(argv) -1
	argv_position = 1
	while (total_indexes >= argv_position):
		index_name = sys.argv[argv_position]
		print("Starting on index: %s\n" % index_name)
		
		# Set the index to writeable - Graylog sets them to read only once it rolls over to a new index
		# CURL equivalent = curl -XPUT -H "Content-Type: application/json" http://localhost:9200/index_name/_settings -d '{"index.blocks.write": "false"}'
		# Note this also effectively validates the index name passed in, as if the index doesn't exist it will throw an error.
		##  If you need to list your current indexes, you can use this command (without the curly brackets) {curl -X GET "localhost:9200/_cat/indices?v"}
		put = es.indices.put_settings(index=index_name, body={"index.blocks.write": "false"})
		if 'acknowledged' not in put or put['acknowledged'] != True:
			raise Exception('Failed to set writeable index: %s\n\n%s' % (index, json.dumps(put)))

		# Perform the de-duplication
		scroll_over_all_docs(index_name)
		loop_over_hashes_and_remove_duplicates(index_name)
		
		# Set the index to back to read-only
		put = es.indices.put_settings(index=index_name, body={"index.blocks.write": "true"})
		if 'acknowledged' not in put or put['acknowledged'] != True:
			raise Exception('Failed to set read-only index: %s\n\n%s' % (index, json.dumps(put)))

		#Very specific to Graylog, recalculate the index ranges
		## Reference: http://docs.graylog.org/en/3.0/pages/configuration/rest_api.html#interacting-with-the-graylog-rest-api
		## Create an API token to use here (following the doc above), and use whatever hostname your Graylog instance responds to
		api_host = 'https://my.server.fqdn:9000'
		api_token = 'zzz_my_token_zzz'
		recalc_index_ranges(api_host,api_token,index_name)
			
		# Finally, increment the counter for the loop
		argv_position = argv_position + 1
		
	print("Time Completed: %s" % datetime.datetime.now())		

# Execute script
main(sys.argv)

EDIT: Fixed a typo in the “recalc_index_ranges” function name

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.