EzDevInfo.com

elasticsearch-py

Official Python low-level client for Elasticsearch. Python Elasticsearch Client — Elasticsearch 1.6.0 documentation

Python elasticsearch client returns true when type does not exists

I am trying to delete the mapping of a type but before doing that I am checking if the type exists as in the code below:

def delete_mapping(self, doc_type):
    if self.elasticsearch.indices.exists_type(index='my_index', doc_type=doc_type):
        self.elasticsearch.indices.delete_mapping(index='my_index', doc_type=doc_type)

The line that checks if the type exists is returning true, even for a non-existing type. So I get TypeMissingException in the next line while trying to delete the mapping of the type.

When we run the code on the ipython console everything looks good, the problem occurs when the code is run as part of an async task. This does not happens every single time but randomly on the QA and Production servers. These two environments are the only ones in which we have two nodes for elasticsearch, we don't see the problem on other environments.

I am working with Python 2.7, elasticsearch-py 0.4.4 and elasticsearch version 1.0.1 with two nodes over a 64bit Linux. The code is run as a periodic celery task.

EDIT: Added more details to the problem.


Source: (StackOverflow)

Getting Parse error for elasticsearch-py

I am trying to search my entire elasticsearch data for a certain word "tsbu" within a time range. When I try running this, I get a SearchParseException and Parse Failure.

es = Elasticsearch()

doc = {
        "query": {
            "match" : { 
                "message" : "tsbu"
            }
        }, 
        "range" : { 
            "@timestamp" : { 
                "gte" : "2015-06-09T14:44:00.000Z", 
                "lte" : "2015-06-09T14:50:00.000Z"
            }
        }
    }

print es.search(index="_all", body=doc)

The complete error I get is:

Traceback (most recent call last):
  File "essearch.py", line 22, in <module>
print es.search(index="_all", body=doc)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/client/utils.py", line 69, in _wrapped
return func(*args, params=params, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/client/__init__.py", line 504, in search
params=params, body=body)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/transport.py", line 307, in perform_request
status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/connection/http_urllib3.py", line 89, in perform_request
self._raise_error(response.status, raw_data)
  File "/usr/local/lib/python2.7/site-packages/elasticsearch/connection/base.py", line 105, in _raise_error
raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
elasticsearch.exceptions.RequestError: TransportError(400, u'SearchPhaseExecutionException[Failed to execute phase [query], all shards failed; shardFailures {[mPhuId4qSpa5osrqfeG5Tw][.kibana][0]: SearchParseException[[.kibana][0]: query[message:tsbu],from[-1],size[-1]: Parse Failure [Failed to parse source [{"query": {"match": {"message": "tsbu"}}, "range": {"@timestamp": {"gte": "2015-06-09T14:44:00.000Z", "lte": "2015-06-09T14:50:00.000Z"}}}]]]; nested: SearchParseException[[.kibana][0]: query[message:tsbu],from[-1],size[-1]: Parse Failure [No parser for element [range]]]; }{[mPhuId4qSpa5osrqfeG5Tw][logstash-2015.06.08][0]: SearchParseException[[logstash-2015.06.08][0]: query[message:tsbu],from[-1],size[-1]: Parse Failure [Failed to parse source [{"query": {"match": {"message": "tsbu"}}, "range": {"@timestamp": {"gte": "2015-06-09T14:44:00.000Z", "lte": "2015-06-09T14:50:00.000Z"}}}]]]; nested: SearchParseException[[logstash-2015.06.08][0]: query[message:tsbu],from[-1],size[-1]: Parse Failure [No parser for element [range]]]; }{[mPhuId4qSpa5osrqfeG5Tw][logstash-2015.06.09][0]: SearchParseException[[logstash-2015.06.09][0]: query[message:tsbu],from[-1],size[-1]: Parse Failure [Failed to parse source [{"query": {"match": {"message": "tsbu"}}, "range": {"@timestamp": {"gte": "2015-06-09T14:44:00.000Z", "lte": "2015-06-09T14:50:00.000Z"}}}]]]; nested: SearchParseException[[logstash-2015.06.09][0]: query[message:tsbu],from[-1],size[-1]: Parse Failure [No parser for element [range]]]; }{[mPhuId4qSpa5osrqfeG5Tw][logstash-2015.06.10][0]: SearchParseException[[logstash-2015.06.10][0]: query[message:tsbu],from[-1],size[-1]: Parse Failure [Failed to parse source [{"query": {"match": {"message": "tsbu"}}, "range": {"@timestamp": {"gte": "2015-06-09T14:44:00.000Z", "lte": "2015-06-09T14:50:00.000Z"}}}]]]; nested: SearchParseException[[logstash-2015.06.10][0]: query[message:tsbu],from[-1],size[-1]: Parse Failure [No parser for element [range]]]; }{[mPhuId4qSpa5osrqfeG5Tw][logstash-2015.06.08][1]: SearchParseException[[logstash-2015.06.08][1]: query[message:tsbu],from[-1],size[-1]: Parse Failure [Failed to parse source [{"query": {"match": {"message": "tsbu"}}, "range": {"@timestamp": {"gte": "2015-06-09T14:44:00.000Z", "lte": "2015-06-09T14:50:00.000Z"}}}]]]; nested: SearchParseException[[logstash-2015.06.08][1]: query[message:tsbu],from[-1],size[-1]: Parse Failure [No parser for element [range]]]; }{[mPhuId4qSpa5osrqfeG5Tw][logstash-2015.06.09][1]: SearchParseException[[logstash-2015.06.09][1]: query[message:tsbu],from[-1],size[-1]: Parse Failure [Failed to parse source [{"query": {"match": {"message": "tsbu"}}, "range": {"@timestamp": {"gte": "2015-06-09T14:44:00.000Z", "lte": "2015-06-09T14:50:00.000Z"}}}]]]; nested: SearchParseException[[logstash-2015.06.09][1]: query[message:tsbu],from[-1],size[-1]: Parse Failure [No parser for element [range]]]; }{[mPhuId4qSpa5osrqfeG5Tw][logstash-2015.06.10][1]: SearchParseException[[logstash-2015.06.10][1]: query[message:tsbu],from[-1],size[-1]: Parse Failure [Failed to parse source [{"query": {"match": {"message": "tsbu"}}, "range": {"@timestamp": {"gte": "2015-06-09T14:44:00.000Z", "lte": "2015-06-09T14:50:00.000Z"}}}]]]; nested: SearchParseException[[logstash-2015.06.10][1]: query[message:tsbu],from[-1],size[-1]: Parse Failure [No parser for element [range]]]; }{[mPhuId4qSpa5osrqfeG5Tw][logstash-2015.06.08][2]: SearchParseException[[logstash-2015.06.08][2]: query[message:tsbu],from[-1],size[-1]: Parse Failure [Failed to parse source [{"query": {"match": {"message": "tsbu"}}, "range": {"@timestamp": {"gte": "2015-06-09T14:44:00.000Z", "lte": "2015-06-09T14:50:00.000Z"}}}]]]; nested: SearchParseException[[logstash-2015.06.08][2]: query[message:tsbu],from[-1],size[-1]: Parse Failure [No parser for element [range]]]; }{[mPhuId4qSpa5osrqfeG5Tw][logstash-2015.06.09][2]: SearchParseException[[logstash-2015.06.09][2]: query[message:tsbu],from[-1],size[-1]: Parse Failure [Failed to parse source [{"query": {"match": {"message": "tsbu"}}, "range": {"@timestamp": {"gte": "2015-06-09T14:44:00.000Z", "lte": "2015-06-09T14:50:00.000Z"}}}]]]; nested: SearchParseException[[logstash-2015.06.09][2]: query[message:tsbu],from[-1],size[-1]: Parse Failure [No parser for element [range]]]; }{[mPhuId4qSpa5osrqfeG5Tw][logstash-2015.06.10][2]: SearchParseException[[logstash-2015.06.10][2]: query[message:tsbu],from[-1],size[-1]: Parse Failure [Failed to parse source [{"query": {"match": {"message": "tsbu"}}, "range": {"@timestamp": {"gte": "2015-06-09T14:44:00.000Z", "lte": "2015-06-09T14:50:00.000Z"}}}]]]; nested: SearchParseException[[logstash-2015.06.10][2]: query[message:tsbu],from[-1],size[-1]: Parse Failure [No parser for element [range]]]; }{[mPhuId4qSpa5osrqfeG5Tw][logstash-2015.06.08][3]: SearchParseException[[logstash-2015.06.08][3]: query[message:tsbu],from[-1],size[-1]: Parse Failure [Failed to parse source [{"query": {"match": {"message": "tsbu"}}, "range": {"@timestamp": {"gte": "2015-06-09T14:44:00.000Z", "lte": "2015-06-09T14:50:00.000Z"}}}]]]; nested: SearchParseException[[logstash-2015.06.08][3]: query[message:tsbu],from[-1],size[-1]: Parse Failure [No parser for element [range]]]; }{[mPhuId4qSpa5osrqfeG5Tw][logstash-2015.06.09][3]: SearchParseException[[logstash-2015.06.09][3]: query[message:tsbu],from[-1],size[-1]: Parse Failure [Failed to parse source [{"query": {"match": {"message": "tsbu"}}, "range": {"@timestamp": {"gte": "2015-06-09T14:44:00.000Z", "lte": "2015-06-09T14:50:00.000Z"}}}]]]; nested: SearchParseException[[logstash-2015.06.09][3]: query[message:tsbu],from[-1],size[-1]: Parse Failure [No parser for element [range]]]; }{[mPhuId4qSpa5osrqfeG5Tw][logstash-2015.06.10][3]: SearchParseException[[logstash-2015.06.10][3]: query[message:tsbu],from[-1],size[-1]: Parse Failure [Failed to parse source [{"query": {"match": {"message": "tsbu"}}, "range": {"@timestamp": {"gte": "2015-06-09T14:44:00.000Z", "lte": "2015-06-09T14:50:00.000Z"}}}]]]; nested: SearchParseException[[logstash-2015.06.10][3]: query[message:tsbu],from[-1],size[-1]: Parse Failure [No parser for element [range]]]; }{[mPhuId4qSpa5osrqfeG5Tw][logstash-2015.06.08][4]: SearchParseException[[logstash-2015.06.08][4]: query[message:tsbu],from[-1],size[-1]: Parse Failure [Failed to parse source [{"query": {"match": {"message": "tsbu"}}, "range": {"@timestamp": {"gte": "2015-06-09T14:44:00.000Z", "lte": "2015-06-09T14:50:00.000Z"}}}]]]; nested: SearchParseException[[logstash-2015.06.08][4]: query[message:tsbu],from[-1],size[-1]: Parse Failure [No parser for element [range]]]; }{[mPhuId4qSpa5osrqfeG5Tw][logstash-2015.06.09][4]: SearchParseException[[logstash-2015.06.09][4]: query[message:tsbu],from[-1],size[-1]: Parse Failure [Failed to parse source [{"query": {"match": {"message": "tsbu"}}, "range": {"@timestamp": {"gte": "2015-06-09T14:44:00.000Z", "lte": "2015-06-09T14:50:00.000Z"}}}]]]; nested: SearchParseException[[logstash-2015.06.09][4]: query[message:tsbu],from[-1],size[-1]: Parse Failure [No parser for element [range]]]; }{[mPhuId4qSpa5osrqfeG5Tw][logstash-2015.06.10][4]: SearchParseException[[logstash-2015.06.10][4]: query[message:tsbu],from[-1],size[-1]: Parse Failure [Failed to parse source [{"query": {"match": {"message": "tsbu"}}, "range": {"@timestamp": {"gte": "2015-06-09T14:44:00.000Z", "lte": "2015-06-09T14:50:00.000Z"}}}]]]; nested: SearchParseException[[logstash-2015.06.10][4]: query[message:tsbu],from[-1],size[-1]: Parse Failure [No parser for element [range]]]; }]')

Source: (StackOverflow)

Advertisements

How to query ElasticSeach with elasticsearch-py to only return data with a specific field

I am trying to figure out a query which will return queries which data that has a specific field, consider the following:

Index netlog-yyyy-mm-dd
(in python)

arch -> dict of information about an archive
  id -> id number of an archive

network -> dict of network logs
  udp -> dict of data
  tcp -> dict of data
  app -> dict of data
  pcap_hash (optional) -> string hash of a pcap if one was captured

So I am trying to build a query that only returns instances where we have a pcap, based off of previous examples I came up with the below query but it will return all results, and not just the one that have pcap hashes.

es = Elasticsearch()
# For loop to iterate through a few archive_id's
result = es.search(
    index="netlogs-*",
    doc_type="archived",
    q="(arch.id: \"%s\") AND (network.pcap_hash: \"*\")" % archive_id
)

But, the keys that are returned in network are:

Received archive: 2
[u'udp', u'tcp', u'pcap_hash', u'app']
Received archive: 1
[u'udp', u'tcp', u'app']

Is there a way to do this with lucene? or would this require a slightly more complex dsl query?


Source: (StackOverflow)

Add Timestamp to ElasticSearch with Elasticsearch-py using Bulk-API

I'm trying to add a timestamp to my data, have elasticsearch-py bulk index it, and then display the data with kibana.

My data is showing up in kibana, but my timestamp is not being used. When I go to the "Discovery" tab after configuring my index pattern, I get 0 results (yes, I tried adjusting the search time).

Here is what my bulk index json looks like:

{'index': 
         {'_timestamp': u'2015-08-11 14:18:26', 
          '_type': 'webapp_fingerprint', 
          '_id': u'webapp_id_redacted_2015_08_13_12_39_34',
          '_index': 'webapp_index'
         }
}

****JSON DATA HERE***

This will be accepted by elasticsearch and will get imported into Kibana, but the _timestamp field will not actually be indexed (it does show up in the dropdown when configuring an index pattern under "Time-field name").

I also tried formatting the metaFields like this:

{'index': {
           '_type': 'webapp_fingerprint', 
           '_id': u'webapp_id_redacted_2015_08_13_12_50_04', 
           '_index': 'webapp_index'
          }, 
           'source': {
                      '_timestamp': {
                                     'path': u'2015-08-11 14:18:26',
                                     'enabled': True, 
                                     'format': 'YYYY-MM-DD HH:mm:ss'
                                    }
                     }
}

This also doesn't work.

Finally, I tried including the _timestamp field within the index and applying the format, but I got an error with elasticsearch.

{'index': {
           '_timestamp': {
                          'path': u'2015-08-11 14:18:26', 
                          'enabled': True, 
                          'format': 'YYYY-MM-DD HH:mm:ss'
                         }, 
           '_type': 'webapp_fingerprint', 
           '_id': u'webapp_id_redacted_2015_08_13_12_55_53', 
           '_index': 'webapp_index'
          }
}

The error is:

elasticsearch.exceptions.TransportError: 
TransportError(500,u'IllegalArgumentException[Malformed action/metadata 
line [1], expected a simple value for field [_timestamp] but found [START_OBJECT]]')

Any help someone can provide would be greatly appreciated. I apologize if I haven't explained the issue well enough. Let me know if I need to clarify more. Thanks.


Source: (StackOverflow)

Spark machine learning and Elasticsearch analyzed tokens/text in Python

I'm trying to build an application that indexes a bunch of documents in Elasticsearch and retrieves the documents through Boolean queries into Spark for machine learning. I'm trying to do this all through Python through pySpark and elasticsearch-py.

For the machine learning part, I need to create the features using tokens from each of the text documents. To do this, I need to process/analyze each of the documents for typical things like lowercase, stemming, removing stopwords, etc.

So basically I need to turn "Quickly the brown fox is getting away." into something like "quick brown fox get away" or ["quick", "brown", "fox", "get", "away"]. I know you can do this pretty easily through various Python packages and functions, but I want to do this using the Elasticsearch analyzers. Furthermore, I need to do it in a way that is efficient for big datasets.

Basically, I want to pull the analyzed versions of the text or the analyzed tokens directly from Elasticsearch and do it within the Spark framework in an efficient manner. Being the relative ES newcomer, I've figured out how to query documents directly from Spark by adapting the elasticsearch-hadoop plugin through this:

http://blog.qbox.io/elasticsearch-in-apache-spark-python

Basically something like this:

read_conf = {
    'es.nodes': 'localhost',
    'es.port': '9200',
    'es.resource': index_name + '/' + index_type,
    'es.query': '{ "query" : { "match_all" : {} }}',
    } 

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass = 'org.elasticsearch.hadoop.mr.EsInputFormat',
    keyClass = 'org.apache.hadoop.io.NullWritable', 
    valueClass = 'org.elasticsearch.hadoop.mr.LinkedMapWritable', 
    conf = read_conf)

This code will more or less retrieve the unanalyzed original stored version of the text from ES. What I haven't figured out is how to query the analyzed text/tokens in an efficient manner. I've so far figured out two possible ways:

  1. Map the es.termvector() function provided by elasticsearch-py onto each record of the RDD to retrieve the analyzed tokens.
  2. Map the es.indices.analyze() function provided by elasticsearch-py onto each record of the RDD to analyze each record.

See related: Elasticsearch analyze() not compatible with Spark in Python?

From my understanding, both of these methods are extremely inefficient for big datasets because they involve a REST call to ES for each record in the RDD.

Thus, my questions are

  1. Is there an alternative efficient way I can pull the analyzed text/tokens from ES without making a REST call for each record? Perhaps an ES setting that stores the analyzed text in a field along with the original text? Or the ability to request the analyzed tokens/text within the query itself so that I can just include it in the elasticsearch-hadoop configurations.
  2. Is there an alternative or better solution to my problem that can leverage Spark's parallel machine learning capabilities with an ES-like query/storage/analysis capability?

Source: (StackOverflow)

Elasticsearch analyze() not compatible with Spark in Python?

I'm using the elasticsearch-py client within PySpark using Python 3 and I'm running into a problem using the analyze() function with ES in conjunction with an RDD. In particular, each record in my RDD is a string of text and I'm trying to analyze it to get out the token information, but I'm getting an error when trying to use it within a map function in Spark.

For example, this works perfectly fine:

from elasticsearch import Elasticsearch
es = Elasticsearch()
t = 'the quick brown fox'
es.indices.analyze(text=t)['tokens'][0]

{'end_offset': 3,
'position': 1,
'start_offset': 0,
'token': 'the',
'type': '<ALPHANUM>'}

However, when I try this:

trdd = sc.parallelize(['the quick brown fox'])
trdd.map(lambda x: es.indices.analyze(text=x)['tokens'][0]).collect()

I get a really really long error message related to pickling (Here's the end of it):

(self, obj)    109if'recursion'in.[0]:    110="""Could not pickle object as excessively deep recursion required."""--> 111                  picklePicklingErrormsg

  save_memoryviewself obj

: Could not pickle object as excessively deep recursion required.

raise.()    112    113def(,):PicklingError

I'm not sure what the error means. Am I doing something wrong? Is there a way to map the ES analyze function onto records of an RDD?

Edit: I'm also getting this behavior when applying other functions from elasticsearch-py as well (for example, es.termvector()).


Source: (StackOverflow)

python elasticsearch client set mappings during create index

I can set mappings of index being created in curl command like this:

{  
  "mappings":{  
    "logs_june":{  
      "_timestamp":{  
        "enabled":"true"
      },
      "properties":{  
        "logdate":{  
          "type":"date",
          "format":"dd/MM/yyy HH:mm:ss"
        }
      }
    }
  }
}

But I need to create that index with elasticsearch client in python and set mappings.. what is the way ? I tried somethings below but not work:

self.elastic_con = Elasticsearch([host], verify_certs=True)
self.elastic_con.indices.create(index="accesslog", ignore=400)
params = "{\"mappings\":{\"logs_june\":{\"_timestamp\": {\"enabled\": \"true\"},\"properties\":{\"logdate\":{\"type\":\"date\",\"format\":\"dd/MM/yyy HH:mm:ss\"}}}}}"
self.elastic_con.indices.put_mapping(index="accesslog",body=params)

Source: (StackOverflow)

How to update a document using elasticsearch-py?

Does anyone have an example for how to use update? It's documented here, but the documentation is unclear and doesn't included a working example. I've tried the following:

coll = Elasticsearch()
coll.update(index='stories-test',doc_type='news',id=hit.meta.id,
                body={"stanford": 1, "parsed_sents": parsed })

and I get

elasticsearch.exceptions.RequestError: 
TransportError(400, u'ActionRequestValidationException[Validation Failed: 1: script or doc is missing;]')

I would like to update using a partial doc, but the update method doesn't take any argument named 'doc' or 'document'.


Source: (StackOverflow)