Back to Integrations

Splunk Integration

Enrich your Splunk security events with real-time IP threat intelligence from ipinsights.io.

Overview

Splunk is a leading data analytics and SIEM (Security Information and Event Management) platform used by security teams worldwide for log management, real-time monitoring, advanced threat detection, and incident investigation. Its powerful Search Processing Language (SPL) and extensible app framework make it ideal for integrating external threat intelligence feeds.

By integrating ipinsights.io you can automatically enrich every Splunk event containing an IP address with real-time threat intelligence — including reputation scores, geolocation, ASN data, Tor/proxy/VPN detection and blocklist membership — giving your SOC analysts instant context directly within Splunk dashboards and searches.

The integration uses a custom search command built with the Splunk SDK for Python, or alternatively a scripted lookup, to seamlessly call the ipinsights.io API at search time and enrich events with threat data. Results are cached in a KV store collection for performance and visualised on custom dashboards.

Architecture Overview

The diagram below shows the end-to-end data flow when security events are enriched with threat intelligence:

┌──────────────────┐ raw events ┌──────────────────────┐ HTTP GET ┌──────────────────────┐ │ Data Sources │ ──────────────▶ │ Splunk Indexer │ │ ipinsights.io API │ │ (firewalls, IDS, │ │ (index=security) │ │ /api/v1/lookup │ │ web servers) │ └──────────┬───────────┘ └──────────┬───────────┘ └──────────────────┘ │ │ │ SPL search │ ▼ │ ┌──────────────────┐ API call JSON │ │ Custom Search │ ─────────────────────────▶ │ │ Command or │ ◀──────────────────────────┘ │ Scripted Lookup │ response └──────────┬───────┘ │ │ enriched results ▼ ┌──────────────────────────┐ │ KV Store Lookup Cache │ │ + Dashboard Panels │ │ + Alerts & Reports │ └──────────────────────────┘
  1. Data sources (firewalls, IDS/IPS, web servers, etc.) send raw events to the Splunk indexer.
  2. A scheduled or ad-hoc SPL search extracts unique source IP addresses from security events.
  3. The custom search command (or scripted lookup) calls the ipinsights.io API for each unique IP.
  4. The JSON response containing threat score, geolocation, ASN data and blocklist status is parsed and merged into the event.
  5. Enriched results are written to a KV store collection for caching and fast lookups.
  6. Custom dashboards, alerts and reports visualise the enriched data for SOC analysts.

Prerequisites

  • Splunk Enterprise 8.x+ or Splunk Cloud
  • Admin access to the Splunk instance (required for installing apps and custom commands)
  • Python 3 available on all search heads (Splunk 8.x+ ships with Python 3 by default)
  • The Splunk SDK for Pythonpip install splunk-sdk (included when bundled with the app)
  • An ipinsights.io API key — available on your profile page (or register for free)
  • Outbound HTTPS (port 443) access from Splunk search heads to https://ipinsights.io

Step 1 — Create a Custom Search Command

The heart of the integration is a custom streaming search command that takes an IP field name as an argument, calls the ipinsights.io API for each unique IP, and appends enrichment fields to every event.

Python Script — ipinsights_lookup.py

Create the following Python script in the app's bin/ directory. It uses the Splunk SDK's StreamingCommand to process events in a streaming fashion:

#!/usr/bin/env python3 """ ipinsights_lookup.py — Splunk custom search command Enriches events with IP threat intelligence from ipinsights.io. Usage in SPL: index=firewall | ipinsightslookup ip_field=src_ip """ import sys import os import json import time sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'lib')) from splunklib.searchcommands import ( dispatch, StreamingCommand, Configuration, Option, validators ) try: from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError except ImportError: from urllib2 import Request, urlopen, URLError, HTTPError API_BASE = 'https://ipinsights.io/api/v1/lookup' CACHE_TTL = 3600 # Cache results for 1 hour @Configuration() class IPInsightsLookupCommand(StreamingCommand): """Enrich events with IP threat intelligence from ipinsights.io.""" ip_field = Option( doc='Name of the field containing the IP address', require=True, validate=validators.Fieldname() ) api_key = Option( doc='API key for ipinsights.io (overrides app config)', require=False, default=None ) # In-memory cache shared across events in the same search _cache = {} def _get_api_key(self): """Retrieve API key from option or Splunk storage/passwords.""" if self.api_key: return self.api_key # Fall back to passwords.conf credential try: storage = self.service.storage_passwords for credential in storage: if credential.realm == 'ipinsights_ta' and credential.username == 'api_key': return credential.clear_password except Exception: pass return os.environ.get('IPINSIGHTS_API_KEY', '') def _lookup_ip(self, ip, key): """Call the API and return the parsed JSON response, with caching.""" now = time.time() if ip in self._cache and (now - self._cache[ip]['ts']) < CACHE_TTL: return self._cache[ip]['data'] url = '{}?ip={}'.format(API_BASE, ip) req = Request(url) req.add_header('X-API-Key', key) req.add_header('Accept', 'application/json') req.add_header('User-Agent', 'Splunk-IPInsights/1.0') try: resp = urlopen(req, timeout=10) body = json.loads(resp.read().decode('utf-8')) if body.get('success') and 'data' in body: self._cache[ip] = {'data': body['data'], 'ts': now} return body['data'] except (HTTPError, URLError, ValueError) as exc: self.logger.warning('ipinsights_lookup: API error for %s: %s', ip, exc) return None def stream(self, records): key = self._get_api_key() if not key: self.logger.error('ipinsights_lookup: No API key configured') for record in records: ip = record.get(self.ip_field) if ip and key: data = self._lookup_ip(ip, key) if data: threat = data.get('threat_assessment', {}) record['threat_score'] = threat.get('score', '') record['threat_level'] = threat.get('level', '') record['ip_country'] = data.get('country', '') record['ip_isp'] = data.get('isp', '') record['is_tor'] = str(data.get('is_tor', False)).lower() record['is_proxy'] = str(data.get('is_proxy', False)).lower() record['is_vpn'] = str(data.get('is_vpn', False)).lower() record['ip_city'] = data.get('city', '') record['ip_asn'] = data.get('asn', '') record['ip_org'] = data.get('org', '') yield record dispatch(IPInsightsLookupCommand, sys.argv, sys.stdin, sys.stdout, __name__)

The command caches results in memory for the duration of the search to avoid redundant API calls for the same IP address. Adjust CACHE_TTL as needed.

Step 2 — Create the App Directory Structure

Splunk apps follow a standard directory layout. Create the following structure under your Splunk installation:

$SPLUNK_HOME/etc/apps/ipinsights_ta/ ├── default/ │ ├── app.conf # App metadata │ ├── commands.conf # Custom search command registration │ ├── transforms.conf # Scripted lookup definition (optional) │ ├── collections.conf # KV store collection definition │ └── savedsearches.conf # Scheduled enrichment search ├── bin/ │ └── ipinsights_lookup.py # Custom search command script ├── lib/ │ └── splunklib/ # Splunk SDK for Python (bundled) ├── local/ # Local overrides (auto-created) ├── lookups/ # Lookup table files └── metadata/ └── default.meta # Permissions metadata

Create the Structure

cd $SPLUNK_HOME/etc/apps mkdir -p ipinsights_ta/{default,bin,lib,local,lookups,metadata} # Copy the Python script cp ipinsights_lookup.py ipinsights_ta/bin/ # Bundle the Splunk SDK pip install splunk-sdk --target=ipinsights_ta/lib/ # Set permissions chmod 755 ipinsights_ta/bin/ipinsights_lookup.py chown -R splunk:splunk ipinsights_ta/

Step 3 — Configure the App

default/app.conf

Define the app metadata so Splunk recognises it on the Apps page:

[install] is_configured = true build = 1 [ui] is_visible = false label = IP Insights Threat Intelligence [launcher] author = ipinsights.io description = Enrich Splunk events with IP threat intelligence from ipinsights.io. Adds reputation scores, geolocation, ASN data and anonymiser detection. version = 1.0.0

default/commands.conf

Register the custom search command so it can be invoked in SPL:

[ipinsightslookup] filename = ipinsights_lookup.py chunked = true python.version = python3

default/collections.conf

Define a KV store collection for caching enrichment results:

[ipinsights_cache] field.ip = string field.threat_score = number field.threat_level = string field.country = string field.isp = string field.is_tor = string field.is_proxy = string field.is_vpn = string field.city = string field.asn = number field.org = string field.updated_at = time

metadata/default.meta

Grant global read access so the command is available across all apps:

[] export = system [commands] export = system

Store the API Key Securely

Use Splunk's credential storage (passwords.conf) to keep your API key encrypted at rest:

# Via the Splunk CLI $SPLUNK_HOME/bin/splunk add credentials \ -realm ipinsights_ta \ -username api_key \ -password "YOUR_API_KEY_HERE" # Or via the REST API curl -k -u admin:changeme \ https://localhost:8089/servicesNS/nobody/ipinsights_ta/storage/passwords \ -d name=api_key \ -d realm=ipinsights_ta \ -d password="YOUR_API_KEY_HERE"

Never hard-code your API key in scripts. The custom command retrieves it from storage/passwords at runtime.

Step 4 — Alternative: Scripted Lookup

If you prefer Splunk's lookup mechanism over a custom search command, you can create a scripted lookup that is invoked automatically when you reference it in SPL with the lookup command.

default/transforms.conf

[ipinsights_scripted_lookup] external_cmd = ipinsights_scripted.py ip fields_list = ip, threat_score, threat_level, country, isp, is_tor, is_proxy python.version = python3

Python Script — bin/ipinsights_scripted.py

The scripted lookup reads from stdin and writes enriched CSV rows to stdout:

#!/usr/bin/env python3 """ ipinsights_scripted.py — Splunk scripted lookup Reads IP addresses from stdin, enriches via the API, writes CSV to stdout. """ import csv import json import os import sys try: from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError except ImportError: from urllib2 import Request, urlopen, URLError, HTTPError API_BASE = 'https://ipinsights.io/api/v1/lookup' API_KEY = os.environ.get('IPINSIGHTS_API_KEY', '') FIELDS_OUT = [ 'ip', 'threat_score', 'threat_level', 'country', 'isp', 'is_tor', 'is_proxy' ] def lookup_ip(ip): """Call the API and return a dict of enrichment fields.""" url = '{}?ip={}'.format(API_BASE, ip) req = Request(url) req.add_header('X-API-Key', API_KEY) req.add_header('Accept', 'application/json') req.add_header('User-Agent', 'Splunk-IPInsights-Lookup/1.0') try: resp = urlopen(req, timeout=10) body = json.loads(resp.read().decode('utf-8')) if body.get('success') and 'data' in body: d = body['data'] threat = d.get('threat_assessment', {}) return { 'ip': ip, 'threat_score': str(threat.get('score', '')), 'threat_level': threat.get('level', ''), 'country': d.get('country', ''), 'isp': d.get('isp', ''), 'is_tor': str(d.get('is_tor', False)).lower(), 'is_proxy': str(d.get('is_proxy', False)).lower(), } except (HTTPError, URLError, ValueError): pass return {f: '' for f in FIELDS_OUT} def main(): reader = csv.DictReader(sys.stdin) writer = csv.DictWriter(sys.stdout, fieldnames=FIELDS_OUT) writer.writeheader() for row in reader: ip = row.get('ip', '').strip() if ip: result = lookup_ip(ip) writer.writerow(result) else: writer.writerow({f: '' for f in FIELDS_OUT}) if __name__ == '__main__': main()

Using the Scripted Lookup in SPL

index=firewall sourcetype=cisco:asa | dedup src_ip | rename src_ip AS ip | lookup ipinsights_scripted_lookup ip | table ip, threat_score, threat_level, country, isp, is_tor, is_proxy

Step 5 — Create a Scheduled Search for Enrichment

Instead of enriching at search time (which can be slow for large result sets), create a scheduled search that runs periodically, enriches unique IPs, and stores the results in the KV store for fast lookups.

SPL — Scheduled Enrichment Search

| tstats dc(host) AS host_count WHERE index=security BY src_ip | dedup src_ip | rename src_ip AS ip | lookup ipinsights_cache ip OUTPUT threat_score AS cached_score | where isnull(cached_score) | ipinsightslookup ip_field=ip | eval updated_at=now() | outputlookup ipinsights_cache append=true key_field=ip

This search finds unique source IPs from your security index, filters out IPs that are already in the cache, enriches the remainder via the custom command, and writes the results back to the KV store.

default/savedsearches.conf

[IPInsights - Enrich New IPs] search = | tstats dc(host) AS host_count WHERE index=security BY src_ip \ | dedup src_ip \ | rename src_ip AS ip \ | lookup ipinsights_cache ip OUTPUT threat_score AS cached_score \ | where isnull(cached_score) \ | ipinsightslookup ip_field=ip \ | eval updated_at=now() \ | outputlookup ipinsights_cache append=true key_field=ip cron_schedule = */15 * * * * enableSched = 1 dispatch.earliest_time = -15m dispatch.latest_time = now description = Enrich new IP addresses with ipinsights.io threat intelligence every 15 minutes. disabled = false

Cache Expiry Search

Add a second scheduled search to expire stale cache entries (e.g. older than 24 hours):

[IPInsights - Expire Stale Cache] search = | inputlookup ipinsights_cache \ | where updated_at > relative_time(now(), "-24h") \ | outputlookup ipinsights_cache cron_schedule = 0 * * * * enableSched = 1 description = Remove cached entries older than 24 hours. disabled = false

Step 6 — Create Dashboard Panels

Use the cached enrichment data to build informative dashboard panels. Below are SPL queries for common visualisations.

High-Risk IPs Table

Display a table of IPs with the highest threat scores:

| inputlookup ipinsights_cache | where threat_score > 50 | sort - threat_score | table ip, threat_score, threat_level, country, isp, is_tor, is_proxy | head 50 | rename ip AS "IP Address", threat_score AS "Threat Score", threat_level AS "Risk Level", country AS "Country", isp AS "ISP", is_tor AS "Tor Node", is_proxy AS "Proxy"

Threat Score Distribution

Show the distribution of threat scores across all enriched IPs:

| inputlookup ipinsights_cache | eval score_bucket = case( threat_score <= 20, "Low (0–20)", threat_score <= 50, "Medium (21–50)", threat_score <= 70, "High (51–70)", threat_score > 70, "Critical (71–100)", true(), "Unknown" ) | stats count BY score_bucket | sort score_bucket

Top Countries Chart

Visualise the top source countries for threat IPs:

| inputlookup ipinsights_cache | where threat_score > 30 | stats count BY country | sort - count | head 20

Tor / Proxy Detection Panel

Identify IPs using anonymisation services:

| inputlookup ipinsights_cache | where is_tor="true" OR is_proxy="true" OR is_vpn="true" | eval anonymiser_type = case( is_tor="true", "Tor Exit Node", is_proxy="true", "Proxy", is_vpn="true", "VPN", true(), "Unknown" ) | stats count BY anonymiser_type | append [ | inputlookup ipinsights_cache | where is_tor="true" OR is_proxy="true" OR is_vpn="true" | table ip, anonymiser_type, threat_score, country | sort - threat_score ]

Step 7 — Set Up Alerts

Create a Splunk alert that triggers whenever a high-risk IP (threat score above 70) is detected in your security events.

Alert SPL Query

index=security sourcetype=* | ipinsightslookup ip_field=src_ip | where threat_score > 70 | table _time, src_ip, dest_ip, threat_score, threat_level, country, is_tor, is_proxy | sort - threat_score

savedsearches.conf Snippet

[IPInsights - High Risk IP Alert] search = index=security sourcetype=* \ | ipinsightslookup ip_field=src_ip \ | where threat_score > 70 \ | table _time, src_ip, dest_ip, threat_score, threat_level, country, is_tor, is_proxy \ | sort - threat_score cron_schedule = */5 * * * * enableSched = 1 dispatch.earliest_time = -5m dispatch.latest_time = now alert.severity = 4 alert.suppress = 1 alert.suppress.period = 1h alert.suppress.fields = src_ip alert_type = number of events alert_comparator = greater than alert_threshold = 0 action.email = 1 action.email.to = soc@yourcompany.com action.email.subject = [ipinsights.io] High-Risk IP Detected action.email.message.alert = A source IP with a threat score above 70 has been detected. description = Fires when a source IP with a threat score above 70 is detected in security events. disabled = false

Alert Configuration Options

  • Trigger condition: When the number of results is greater than 0.
  • Throttle: Suppress alerts for the same src_ip for 1 hour to reduce noise.
  • Severity: Set to Critical (level 4) for high-risk IPs.
  • Actions: Send email, create a notable event (Splunk ES), or trigger a webhook to your ticketing system.
  • Adjust the threat_score threshold to match your risk appetite (e.g. 50 for medium, 80 for critical-only).

Verification

After deploying the app and restarting Splunk, verify the integration is working correctly:

Test the Custom Command

Run a simple search in the Splunk Search & Reporting app:

| makeresults | eval ip="8.8.8.8" | ipinsightslookup ip_field=ip | table ip, threat_score, threat_level, ip_country, ip_isp, is_tor, is_proxy

You should see enrichment fields populated with threat intelligence data for the test IP.

Check the KV Store

Verify cached results exist in the KV store collection:

| inputlookup ipinsights_cache | stats count | eval status=if(count > 0, "KV store populated (" . count . " entries)", "KV store is empty — run the scheduled search manually") | table status

Validate Enriched Data

Join live security events with cached enrichment data:

index=security earliest=-1h | dedup src_ip | rename src_ip AS ip | lookup ipinsights_cache ip | where isnotnull(threat_score) | table ip, threat_score, threat_level, country, isp, is_tor | sort - threat_score

Best Practices

  • Batch processing & rate limiting: Always dedup IPs before calling the custom command to minimise API calls. The scheduled search approach ensures you only look up each IP once.
  • KV store caching: Use the KV store as your primary lookup source for dashboards and ad-hoc searches. Reserve real-time API calls for scheduled enrichment only.
  • Error handling: The custom command logs warnings for failed API calls. Monitor index=_internal sourcetype=splunkd component=ExternalSearchCommand for errors.
  • API key security: Store your API key in Splunk's storage/passwords (encrypted at rest), never in plaintext configuration files or environment variables on shared systems.
  • Dedicated search heads: For large deployments, run the enrichment scheduled search on a dedicated search head to avoid impacting interactive users.
  • Index-time vs. search-time: Prefer search-time enrichment (as shown in this guide) over index-time to keep raw data intact and allow the enrichment to be updated as threat intelligence evolves.
  • Cache TTL: Set an appropriate cache expiry (e.g. 24 hours) to balance freshness with API usage. High-frequency environments may benefit from shorter TTLs.
  • Splunk Cloud: If running on Splunk Cloud, submit the app via the App Inspect process and ensure your inputs.conf does not use file-based inputs.

Troubleshooting

Custom command not found

  • Verify the app directory exists at $SPLUNK_HOME/etc/apps/ipinsights_ta/.
  • Check that commands.conf is in the default/ directory and the filename matches the script.
  • Restart Splunk after deploying the app: $SPLUNK_HOME/bin/splunk restart.
  • Check btool: $SPLUNK_HOME/bin/splunk btool commands list ipinsightslookup --debug.

Python path or import errors

  • Ensure the Splunk SDK for Python is bundled in the lib/ directory of the app.
  • Verify Python 3 is configured: check python.version = python3 in commands.conf.
  • Test the script directly: $SPLUNK_HOME/bin/splunk cmd python3 $SPLUNK_HOME/etc/apps/ipinsights_ta/bin/ipinsights_lookup.py.
  • Check index=_internal "ipinsights_lookup" ERROR for detailed error messages.

Permission denied errors

  • Ensure the script is readable and executable by the Splunk user: chmod 755 bin/ipinsights_lookup.py.
  • Check ownership: chown -R splunk:splunk $SPLUNK_HOME/etc/apps/ipinsights_ta/.
  • On SELinux systems, verify the correct context: ls -Z bin/ipinsights_lookup.py.

API connectivity issues

  • Test from the search head: curl -H "X-API-Key: YOUR_KEY" "https://ipinsights.io/api/v1/lookup?ip=8.8.8.8".
  • Check firewall/proxy rules allowing outbound HTTPS to https://ipinsights.io.
  • Look for timeout errors in index=_internal sourcetype=splunkd "ipinsights" "timeout".
  • If behind a corporate proxy, set HTTP_PROXY / HTTPS_PROXY environment variables for the Splunk process.

Slow searches or timeouts

  • Always dedup IPs before calling the custom command to reduce API calls.
  • Use the KV store cache — run | lookup ipinsights_cache ip instead of calling the API at search time.
  • Increase the scheduled search frequency if the cache is frequently empty.
  • Check your API rate limit — if you're hitting 429 responses, request a higher limit below.

Missing or empty enrichment fields

  • Verify the IP field name matches exactly (field names are case-sensitive in Splunk).
  • Check that the API key is valid and has not expired on your profile page.
  • Ensure the IP values are valid IPv4 or IPv6 addresses (not hostnames or CIDR ranges).
  • Test with a known IP: | makeresults | eval ip="8.8.8.8" | ipinsightslookup ip_field=ip.

API Key: You can find your API key on your profile page. Don't have an account yet? Register for free.

Request Higher API Limit

Running a high-volume Splunk deployment? If the default rate limit isn't enough for your environment, submit a request below and we'll review it.

Maximum 5,000 characters.