Text
16px
Apache NiFi

NiFi: The Smart
Data Pipeline Builder

Think of NiFi as the delivery truck dispatcher at FreshMart — it decides what data goes where, in what format, on what schedule.

Amazon Web Services

SQS + SNS:
Cloud Messaging

AWS SQS is like a waiting line at the deli counter. SNS is like the store's PA announcement system.

Go Language

Go: Custom NiFi
Processors

Build your own custom NiFi processor in Go — add special business logic that no built-in processor has.

Section 0 — Beginner Guide

🚀 NiFi Step-by-Step: Grocery Store Workflows

Learn NiFi through FreshMart — a grocery store chain. Each workflow below is a real-world data task you can build step by step. No prior NiFi experience needed.

🎓
Core Concept

Think of NiFi like a Grocery Store Conveyor Belt

📋
FlowFile
One item on the belt. Could be a sale record, a product barcode, or a customer complaint.
⚙️
Processor
A station on the belt. Reads, transforms, routes, or sends each FlowFile.
➡️
Connection
The belt between stations. Queues FlowFiles when downstream is busy.
🗺️
Flow
The whole conveyor system. Processors + connections = your data pipeline.
1
Workflow 1 — Sales Data

🧾 POS Sales: Register → Database → Reports

Every time a cashier scans an item, a sale record is created. NiFi collects all register data across 50 stores, normalizes it, and loads it into the data warehouse every 15 minutes.

STEP-BY-STEP FLOW
1
ListenHTTP / GetFile — Receive JSON sale events from POS terminals OR watch /pos/sales/*.json folder
2
ValidateRecord — Check required fields: store_id, sku, qty, price, cashier_id, timestamp
3
JoltTransformJSON — Rename fields to match warehouse schema: price → unit_price, timestamp → sale_ts
4
UpdateAttribute — Add environment=prod, ingest_time=${now()}
5
MergeRecord — Batch 100 records together for efficient bulk insert
6
PutDatabaseRecord + PublishKafka — Load into DW_SALES_FACT table AND publish to pos-sales Kafka topic
💡 Example data: {"store_id":"FM-042", "sku":"BAN-CAVENDISH-1LB", "qty":3, "price":0.99, "cashier_id":"EMP-1147", "timestamp":"2024-11-01T14:23:00Z"}
2
Workflow 2 — Inventory

📦 Inventory Tracking: Stock Levels & Auto-Reorder

NiFi watches inventory levels across all warehouses. When bananas drop below 200 cases, it automatically triggers a reorder to the vendor and notifies the store manager.

STEP-BY-STEP FLOW
1
QueryDatabaseTable— Poll INVENTORY table every 5 min, capture rows where updated_at > last_run
2
SplitRecord— Split batch result into individual product FlowFiles
3
RouteOnAttribute— Route: qty < reorder_threshold → LOW-STOCK path, else → OK path
4
InvokeHTTP— POST reorder request to vendor API: https://vendor.dole.com/api/orders
5
PutSQS— Send low-stock alert to freshmart-reorder-alerts SQS queue for manager app
6
PutDatabaseRecord— Log reorder event to REORDER_LOG table for audit
3
Workflow 3 — Vendors

🚚 Vendor Deliveries: CSV Ingestion & Validation

Every morning, 12 vendors drop CSV files on an SFTP server. NiFi picks them up, validates each row, maps vendor SKUs to FreshMart SKUs, and loads them into the receiving system.

STEP-BY-STEP FLOW
1
GetSFTP— Poll sftp://vendors.freshmart.com/incoming/*.csv every 30 min
2
UpdateAttribute— Extract vendor_id from filename: ${filename:substringBefore('_')}
3
SplitRecord— Split 500-row CSV into 500 individual FlowFiles
4
LookupRecord— Look up FreshMart SKU from vendor's item_code in product catalog DB
5
RouteOnAttribute— Unmatched SKUs → failure/ folder + email alert to vendor
6
PublishKafka + PutS3Object— Publish to vendor-deliveries topic AND archive original CSV to S3
📁 Example CSV row (Dole vendor format): DOLE-BAN-001, Cavendish Bananas 1lb, 240, 0.28, 2024-11-01 → After LookupRecord + JoltTransform: {"sku":"BAN-CAVENDISH-1LB", "name":"Bananas 1lb", "cases":240, "unit_cost":0.28}
4
Workflow 4 — Shelves

🗂️ Shelf Location Updates: Planogram Sync

When corporate updates the store planogram (which product goes on which shelf), NiFi distributes the changes to all 50 store systems, the mobile app, and the digital shelf labels.

STEP-BY-STEP FLOW
1
GetHTTP / GetFile— Pull planogram JSON from HQ API or watch upload folder
2
SplitJson— Split array of shelf assignments into individual FlowFiles per product
3
UpdateAttribute— Extract aisle, section, shelf_level as FlowFile attributes
4
PutDatabaseRecord— UPSERT into PRODUCT_LOCATIONS table (used by mobile app)
5
PublishSNS— Broadcast shelf change to all store systems via SNS fan-out
5
Workflow 5 — Staff

👥 Staff Scheduling & HR Data Sync

Every Monday morning, NiFi pulls the weekly staff schedule from the HR system, generates shift reminders for each employee, and syncs attendance records to payroll.

STEP-BY-STEP FLOW
1
InvokeHTTP— GET https://hr.freshmart.internal/api/schedules/week (triggers Monday 6 AM)
2
SplitRecord— Split schedule into one FlowFile per shift
3
RouteOnAttribute— Route by department: Produce, Deli, Checkout, Stocking
4
ReplaceText— Build SMS message: "Hi ${emp_name}, your shift is ${day} at ${start_time}"
5
PutEmail / InvokeHTTP (Twilio)— Send shift reminder email or SMS to each employee
6
PutDatabaseRecord— Sync clock-in/out records to PAYROLL_ATTENDANCE table
6
Workflow 6 — Customer Feedback

⭐ Feedback & Complaints → Recall Detection

Customers submit feedback via the app, kiosks, or email. NiFi processes it in real time — routing complaints about specific products to the safety team and triggering automatic recalls if the same SKU gets 5+ complaints in an hour.

STEP-BY-STEP FLOW
1
ListenHTTP / ConsumeKafka— Receive feedback JSON from app API or customer-feedback Kafka topic
2
RouteOnContent— Regex match for keywords: "spoiled|sick|recall|rotten|smell" → SAFETY path
3
ExecuteGroovyScript— Count complaints per SKU in NiFi State. Trigger recall if count ≥ 5
4
RouteOnAttribute— Recall threshold hit? → RECALL path. Below threshold → LOG path
5
PublishSNS (RECALL) / PutEmail (LOG)— SNS broadcasts recall to manager + vendor + health team. Email logs routine feedback.
6
PutDatabaseRecord— Write all feedback + outcome to CUSTOMER_FEEDBACK table for analysis
+
Bonus — Common Tasks

🔧 Other Common NiFi Tasks at FreshMart

📅
Daily Report Generation
QueryDatabaseTable → FreeFormTextRecordSetWriter → PutEmail — sends daily sales summary at 8AM to all store managers.
🔄
Price Change Sync
GetHTTP (price feed) → JoltTransformJSON → PutDatabaseRecord → PublishKafka — syncs corporate price changes to all store systems within 2 minutes.
🗄️
Data Lake Archiving
ConsumeKafka → MergeRecord (batch 10k) → ConvertRecord (CSV) → PutS3Object — archives all events to S3 with date-partitioned keys for analytics.
🔍
Loyalty Card Processing
ConsumeKafka (pos-sales) → LookupRecord (customer DB) → ExecuteGroovyScript (calculate points) → InvokeHTTP (loyalty API) — adds points within 30 seconds of purchase.
🌡️
Cold Chain Monitoring
GetHTTP (temp sensors) → RouteOnAttribute (temp > 40°F) → PutSQS urgent alert → PutEmail to maintenance — monitors all refrigeration units every 2 minutes.
📊
Shrinkage Reporting
QueryDatabaseTable (inventory delta vs sales) → ExecuteGroovyScript (calculate shrink %) → RouteOnAttribute (high shrink flag) → PutEmail to loss prevention team.
Section 1 — Apache NiFi

🌊 What is Apache NiFi?

NiFi is a visual data pipeline builder — drag, drop, connect processors on a canvas, and data flows automatically. No heavy coding needed!

🖱️
Visual

Drag-and-drop canvas. Connect boxes with arrows. Each box = a Processor. Each arrow = a Connection carrying data.

📋
FlowFiles

Data moves as FlowFiles — packets with content + attributes (metadata). Like envelopes: the letter inside + address + stamps on the outside.

⚙️
Processors

300+ built-in processors: read files, call APIs, transform JSON, talk to Kafka, SQS, databases. Each has config properties and output ports.

🔄
Back-pressure

NiFi automatically slows data when queues get too full — like a store blocking the entrance when checkout is backed up.

📜
Provenance

Every FlowFile's journey is recorded — where it came from, every transformation, where it went. Full audit trail, like a receipt for each data item.

🔁
Clustering

Run multiple NiFi nodes as a cluster — handled by ZooKeeper! If one node fails, others keep the data flowing.

🤔 When to use NiFi vs Kafka vs Both?
Feature Apache Kafka Apache NiFi Together
Primary JobMessage streaming 🚀Data routing & ETL 🗺️Ingest → Stream → Route
SpeedMillions/sec ⚡Thousands/sec 🏃Best of both
SetupCode required 💻Visual canvas 🖱️Visual + powerful
Best forReal-time eventsData movementEnterprise ETL
FreshMart usePOS → InventoryVendor CSV → KafkaFull pipeline
Section 2 — Interface

🖥️ The NiFi Canvas Interface

NiFi 🌊 FreshMart Root / Vendor Ingestion Flow ⚙️ Processor ↔️ Connect 🗂️ Funnel 📦 ProcGroup 📝 Label CANVAS — drag processors here GetFile /vendor/incoming/*.csv Schedule: 30 sec Running ▶ 0 / 1K SplitRecord CSV → JSON rows 1 row per FlowFile Running ▶ 12 / 1K PublishKafka → vendor-deliveries bootstrap: kafka:9092 Running ▶ 0 / 1K PutSQS → freshmart-alerts Region: us-east-1 Running ▶ failure LogMessage Log bad CSV rows → error.log success UpdateAttribute Add store.id = FM042 Add ts = ${now()} Connected to NiFi cluster (3 nodes) Active Threads: 4 Queued: 12 🏷️ Vendor CSV Ingestion Flow Reads vendor delivery CSVs, splits into rows, publishes to Kafka + SQS
🔑 Key UI Areas
🖥️
Canvas
The big dark dotted area where you drag & drop. Your flow lives here. Pan with scroll, zoom with mouse wheel.
🔲
Processor Box
Each colored box is a Processor. Orange = NiFi built-in, Blue = Kafka, Gold = AWS. Top band = name, body = config summary.
↔️
Connections (Arrows)
Arrows carry FlowFiles between processors. Each shows queue size (e.g. "12 / 1K"). Red dashed = failure route.
🟢
Status Indicators
Green circle = running. Orange = stopped. Red = invalid config. Each processor shows active threads and recent throughput.
🧭 How to Navigate NiFi
1
Access NiFi UI: Open browser → http://localhost:8443/nifi
2
Drag a Processor from the toolbar onto the canvas. Search by name (e.g. "GetFile").
3
Configure it: Double-click → Properties tab → fill in paths, topics, credentials.
4
Connect processors: Hover a processor → drag the middle circle → drop on next processor.
5
Start the flow: Right-click canvas → Start, or right-click each processor. Watch data flow!
Section 3 — Processors

⚙️ Key NiFi Processors for FreshMart

📂
GetFile
Watches a folder for new files. Perfect for vendor CSV deliveries dropped into FreshMart's /sftp/vendor_deliveries/ folder.
🌐
InvokeHTTP
Calls any REST API. FreshMart uses this to pull weather data (affects produce prices) or check vendor portals for delivery updates.
📨
ConsumeKafka
Reads messages from a Kafka topic. NiFi consumes pos-sales to forward daily summaries to the data warehouse.
🗃️
QueryDatabaseTable
Polls a database for new/changed rows. Captures CDC-style changes from the legacy inventory database.
✂️
SplitRecord
Splits one big file (CSV, JSON array) into individual records. A vendor CSV with 500 rows becomes 500 FlowFiles — one per delivery item.
🔄
JoltTransformJSON
Remaps JSON structure using JOLT spec. Converts vendor's format {item_code} to FreshMart's format {sku}.
🏷️
UpdateAttribute
Adds or modifies FlowFile attributes using NiFi Expression Language. Add store.id, processed.timestamp, environment.
🔀
ExecuteGroovyScript
Run custom Groovy code inside NiFi. Validates barcodes, calculates margins, applies business rules not covered by built-in processors.
🚦
RouteOnAttribute
If-then routing based on FlowFile attributes. Route spoiled product complaints to urgent path, normal complaints to regular path.
🧪
RouteOnContent
Routes by looking inside the FlowFile content using regex. Route messages containing "SPOILED" to the recall workflow.
🧲
MergeRecord
Collects many small FlowFiles and batches them into one. Bundle 100 individual sale records into one batch for bulk DB insert.
🔗
LookupRecord
Enriches records by looking up values in a database or cache. Lookup product name + category from SKU code in real time.
📤
PublishKafka
Writes FlowFiles as messages to a Kafka topic. NiFi publishes enriched vendor delivery events to vendor-deliveries topic.
🗄️
PutDatabaseRecord
Inserts records directly into a database table. Loads transformed sales data into DW_SALES_FACT table in IBM Db2.
📧
PutEmail
Sends email alerts. When a product recall is triggered, NiFi emails the store manager and vendor contact automatically.
📁
PutS3Object
Saves FlowFiles to AWS S3. Archive daily sales reports, vendor CSVs, and compliance logs to S3 for long-term storage.
📬
PutSQS
Sends a FlowFile as a message to an Amazon SQS queue. Route high-priority alerts to the freshmart-urgent-alerts SQS queue for the manager app.
📭
GetSQS
Polls an SQS queue and turns each message into a FlowFile. Receive vendor shipment confirmation messages from the supplier's SQS queue.
📣
PublishSNS
Broadcasts a FlowFile as an SNS notification to all subscribers. A spoiled-product alert SNS message triggers SMS to manager, email to vendor, and Kafka event simultaneously.
🪣
PutS3Object
Archives FlowFile content to an S3 bucket. Store compliance backups, vendor delivery proofs, and daily report archives in s3://freshmart-data-lake/.
Section 4 — Real Flows

🛒 FreshMart NiFi Flow Scenarios

Flow #1

🚚 Vendor CSV Delivery Ingestion

GetFile /sftp/vendor/ *.csv files ValidateRecord CSV schema check SKU required fields JoltTransform item_code → sku vendor fmt → FM fmt PublishKafka vendor-deliveries kafka:9092 PutS3Object Archive original CSV s3://fm-archive/ ① Watch folder ② Validate ③ Transform ④ Publish to Kafka ⑤ Archive to S3

When Dole Foods drops a CSV of banana deliveries in the SFTP folder, NiFi immediately picks it up, validates each row, translates vendor codes to FreshMart SKUs, publishes to Kafka (so inventory updates instantly), and archives the original file to S3 for compliance.

Flow #2 — URGENT

😤 Customer Complaint → Recall Alert

ConsumeKafka customer-complaints topic RouteOnAttribute severity == HIGH AND count >= 3 HIGH PublishSNS freshmart-recall SNS topic PutSQS urgent-mgr-queue manager app PublishKafka product-recalls → display signs ① Read complaints ② Route HIGH severity ③ Broadcast SNS ④ Alert manager SQS ⑤ Kafka recall event LOW severity → standard CRM logging flow

When 3+ HIGH severity complaints about the same product arrive within 30 minutes, NiFi routes them to a recall flow: broadcasts an SNS alert (triggers SMS to manager, email to vendor), sends an urgent SQS message to the manager app, and publishes a Kafka event to update all digital shelf signs to "REMOVED".

Flow #3 — Nightly ETL

📊 Sales Aggregation → Data Warehouse

ConsumeKafka pos-sales topic All day's events MergeRecord Batch 500 records or 10-min window LookupRecord SKU → Product DB lookup enrich JoltTransform → DW schema Star schema format PutDatabaseRecord → IBM Db2 DW DW_SALES_FACT PutS3 Backup JSON Data lake ① Consume Kafka ② Batch records ③ Enrich ④ Schema map ⑤ Load to DW ⑥ Archive

Every 10 minutes, NiFi batches the latest POS sale events from Kafka, enriches each record with full product names and categories via a database lookup, transforms the structure to match the data warehouse star schema, and bulk-inserts into IBM Db2 — all while archiving raw JSON to the S3 data lake.

Section 5 — Amazon SQS

📬 Amazon SQS: The Deli Waiting Line

SQS (Simple Queue Service) is like the numbered ticket system at the deli counter. You take a ticket, your order waits in line, and the deli worker processes one at a time — no message is ever lost.

📋
Standard Queue

High throughput, messages may arrive out of order or more than once. Great for FreshMart inventory updates where processing speed matters more than perfect order.

🎯
FIFO Queue

First-In-First-Out — messages delivered exactly once, in order. Used for FreshMart vendor payment processing where order matters.

Dead-Letter Queue

If a message fails processing 5 times, SQS moves it to a Dead-Letter Queue (DLQ) for investigation. Like setting aside the deli order nobody is picking up.

FreshMart SQS Architecture — 4 Queues NiFi PutSQS Kafka Consumer Go Microservice Lambda Trigger Amazon SQS freshmart-inventory-updates Standard · Retention: 4 days · Max: 256KB freshmart-urgent-alerts.fifo FIFO · Exactly-once · Manager app freshmart-vendor-payments.fifo FIFO · Ordered payments · Finance freshmart-dlq (Dead Letter) Failed after 5 retries → investigation Inventory Service Long poll · 20 sec Manager Mobile App Push notification Finance System AP/payment processing CloudWatch Alert Engineering on-call PRODUCERS SQS SERVICE (managed by AWS) CONSUMERS
Go — Send to SQS
package main import ( "context" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sqs" ) func SendInventoryAlert(sku string, qty int) { cfg, _ := config.LoadDefaultConfig(context.TODO()) client := sqs.NewFromConfig(cfg) queueURL := "https://sqs.us-east-1.amazonaws.com/123456789/freshmart-inventory-updates" client.SendMessage(context.TODO(), &sqs.SendMessageInput{ QueueUrl: &queueURL, MessageBody: &fmt.Sprintf( `{"sku":"%s","qty":%d,"alert":"LOW_STOCK"}`, sku, qty, ), // Attributes for filtering/tracing MessageAttributes: map[string]types.MessageAttributeValue{ "store_id": { DataType: aws.String("String"), StringValue: aws.String("FRESHMART-042"), }, }, }) }
Go — Receive from SQS (Long Poll)
func PollInventoryQueue() { // Long polling: wait up to 20s for messages result, _ := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ QueueUrl: &queueURL, MaxNumberOfMessages: 10, WaitTimeSeconds: 20, // long poll VisibilityTimeout: 30, // lock msg 30s }) for _, msg := range result.Messages { // Process the inventory alert err := processAlert(*msg.Body) if err == nil { // Delete only after success client.DeleteMessage(ctx, &sqs.DeleteMessageInput{ QueueUrl: &queueURL, ReceiptHandle: msg.ReceiptHandle, }) } // If err != nil → message returns // to queue after VisibilityTimeout } }
Section 6 — Amazon SNS

📣 Amazon SNS: The PA System

SNS (Simple Notification Service) is like the store PA system. One announcement reaches everyone who's listening — SMS, email, SQS queues, Lambda functions, all at once.

FreshMart SNS Fan-out — Product Recall Alert NiFi PublishSNS 1 message SKU: STRAW-PINT severity: RECALL SNS Topic freshmart-product-recalls 5 active subscriptions Filter policy: store_id 📱 SMS → Store Manager +1-555-FRESH-01 • Instant 📧 Email → Vendor Contact vendor@dole.com • SMTP relay 📬 SQS → Manager App freshmart-urgent-alerts.fifo λ Lambda → Remove Signs Updates shelf display fleet 📨 Kafka → NiFi Log Flow product-recalls topic ≈ 1 sec ≈ 2 sec ≈ 500ms ≈ 200ms ≈ 300ms PUBLISHER (1) SNS TOPIC (fan-out) SUBSCRIBERS (5 at once)
SNS vs SQS — Quick comparison
📣
SNS = Publish once, many listeners. Like a PA announcement — everyone hears it. Used when the same event needs multiple systems to react simultaneously.
📬
SQS = One listener per message. Like the deli counter ticket — one worker processes each ticket. Used when exactly ONE system should handle each message.
🤝
SNS + SQS Fan-out: SNS broadcasts → each subscription is its own SQS queue → each queue has its own consumer. Best of both worlds!
Go — Publish SNS Recall Alert
func PublishRecallAlert(sku, reason string) { cfg, _ := config.LoadDefaultConfig(ctx) client := sns.NewFromConfig(cfg) topicARN := "arn:aws:sns:us-east-1:123456:freshmart-product-recalls" msg := fmt.Sprintf(`{ "event": "PRODUCT_RECALL", "sku": "%s", "reason": "%s", "store": "FRESHMART-042", "timestamp": "%s" }`, sku, reason, time.Now().Format(time.RFC3339)) client.Publish(ctx, &sns.PublishInput{ TopicArn: &topicARN, Message: &msg, Subject: aws.String("URGENT: Product Recall - " + sku), // Filter attributes — only FRESHMART-042 subscribers receive MessageAttributes: map[string]snstypes.MessageAttributeValue{ "store_id": {DataType: aws.String("String"), StringValue: aws.String("FRESHMART-042")}, "severity": {DataType: aws.String("String"), StringValue: aws.String("CRITICAL")}, }, }) }
Section 7 — Combined Architecture

🏗️ NiFi + Kafka + AWS Together

STORE SYSTEMS NIFI CANVAS KAFKA CLUSTER AWS CLOUD 🧾 POS Scanners 🏷️ Inventory DB 🚚 Vendor SFTP 😤 Complaint API 🌐 Weather API GetFile · ConsumeKafka InvokeHTTP · QueryDatabaseTable INGEST JoltTransform · SplitRecord LookupRecord · UpdateAttribute TRANSFORM RouteOnAttribute · MergeRecord RouteOnContent · ValidateRecord ROUTE PublishKafka · PutSQS · PublishSNS PutDatabaseRecord · PutS3Object OUTPUT PutSQS / PublishSNS / PutS3 Kafka Topics pos-sales inventory-updates vendor-deliveries customer-complaints product-recalls daily-sales-report SQS Queues freshmart-inventory-updates (standard) freshmart-urgent-alerts.fifo freshmart-vendor-payments.fifo SNS Topics freshmart-product-recalls freshmart-promotions-broadcast S3 Data Lake s3://freshmart-data-lake/ raw/ · processed/ · archives/ · compliance/ 🦁 ZooKeeper coordinates Kafka cluster ZooKeeper also manages NiFi cluster
Section 8 — Go Custom Processors

🐹 Building Custom NiFi Processors in Go

NiFi's built-in processors don't cover every business need. When you need FreshMart-specific logic — like validating a proprietary barcode format — you can build a custom processor. Go is great because it's fast, compiles to a single binary, and has excellent AWS SDK support.

🏗️ How Custom NiFi Processors Work
1
Stateless Processor (Go binary)
Write a Go program that reads stdin, processes, writes stdout. NiFi calls it via ExecuteStreamCommand processor.
2
NiFi Stateless (Go via gRPC)
Build a NiFi Stateless dataflow wrapper that calls a Go microservice over HTTP/gRPC for complex processing.
3
Java NAR wrapping Go
Wrap a Go executable in a Java NiFi Archive (NAR) file — Go does the logic, Java wrapper provides the NiFi interface.
ExecuteStreamCmd Command: /opt/fm/barcode-validator FlowFile → stdin Go Binary barcode-validator 1. Read JSON from stdin 2. Validate FM barcode format stdout → FlowFile Output Attributes: valid = true/false barcode.format = "FM-EAN13" sku = "APPL-GALA-1LB" → valid → invalid NiFi Processor Your Go Code Results + Routing
main.go — FreshMart Barcode Validator (NiFi stdin/stdout)
package main import ( "bufio"; "encoding/json"; "fmt"; "os"; "regexp"; "strings" ) // FlowFile content structure from NiFi type DeliveryItem struct { ItemCode string `json:"item_code"` Qty int `json:"qty"` UnitPrice float64 `json:"unit_price"` VendorID string `json:"vendor_id"` } type Result struct { DeliveryItem SKU string `json:"sku"` Valid bool `json:"valid"` BarcodeType string `json:"barcode_type"` ErrorMsg string `json:"error,omitempty"` } // FreshMart SKU format: CATEGORY-NAME-SIZE var fmSKUPattern = regexp.MustCompile(`^[A-Z]{3,6}-[A-Z]{2,10}-\d{1,3}[A-Z]{2,3}$`) // Vendor item_code → FM SKU lookup map var vendorToSKU = map[string]string{ "DOLE-BAN-001": "BAN-CAVENDISH-1LB", "DOLE-PNA-002": "PINEAPPLE-EA", "DOLE-MAN-003": "MANGO-ATAULFO-EA", } func main() { scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { var item DeliveryItem if err := json.Unmarshal(scanner.Bytes(), &item); err != nil { fmt.Fprintf(os.Stderr, "parse error: %v\n", err) continue } result := processItem(item) out, _ := json.Marshal(result) fmt.Println(string(out)) // stdout → NiFi FlowFile content } } func processItem(item DeliveryItem) Result { r := Result{DeliveryItem: item} // 1. Translate vendor code → FM SKU sku, found := vendorToSKU[strings.ToUpper(item.ItemCode)] if !found { r.ErrorMsg = "unknown vendor code: " + item.ItemCode return r } r.SKU = sku // 2. Validate the FM SKU format r.Valid = fmSKUPattern.MatchString(sku) if r.Valid { r.BarcodeType = "FM-SKU-V2" } else { r.ErrorMsg = "invalid SKU format: " + sku } return r }
margin_calculator.go — Real-time Profit Margin Enrichment
package main import ( "database/sql"; "encoding/json"; "fmt" "math"; "os" _ "github.com/lib/pq" // PostgreSQL driver ) type SaleEvent struct { SKU string `json:"sku"` SalePrice float64 `json:"price"` Qty int `json:"qty"` DepartmentID string `json:"dept_id"` } type EnrichedSale struct { SaleEvent CostPrice float64 `json:"cost_price"` MarginPct float64 `json:"margin_pct"` MarginUSD float64 `json:"margin_usd"` ProfitTier string `json:"profit_tier"` // HIGH/MED/LOW } var db *sql.DB func init() { var err error db, err = sql.Open("postgres", "host=db.freshmart.internal dbname=products sslmode=require") if err != nil { os.Exit(1) } } func lookupCost(sku string) (float64, error) { var cost float64 err := db.QueryRow( "SELECT unit_cost FROM product_costs WHERE sku=$1 AND active=true", sku).Scan(&cost) return cost, err } func calcMargin(sale SaleEvent) EnrichedSale { cost, err := lookupCost(sale.SKU) e := EnrichedSale{SaleEvent: sale, CostPrice: cost} if err == nil && cost > 0 { gross := (sale.SalePrice - cost) * float64(sale.Qty) e.MarginUSD = math.Round(gross*100) / 100 e.MarginPct = math.Round((sale.SalePrice-cost)/sale.SalePrice*10000) / 100 switch { case e.MarginPct >= 40: e.ProfitTier = "HIGH" case e.MarginPct >= 20: e.ProfitTier = "MED" default: e.ProfitTier = "LOW" } } return e } func main() { decoder := json.NewDecoder(os.Stdin) encoder := json.NewEncoder(os.Stdout) for { var sale SaleEvent if err := decoder.Decode(&sale); err != nil { break } encoder.Encode(calcMargin(sale)) } }
freshness_checker.go — Produce Freshness + Auto-Discount Logic
package main import ( "encoding/json"; "fmt"; "os"; "time" ) type ProduceItem struct { SKU string `json:"sku"` ArrivalDate string `json:"arrival_date"` // RFC3339 ExpiryDate string `json:"expiry_date"` CurrentQty int `json:"current_qty"` BasePrice float64 `json:"base_price"` Category string `json:"category"` } type FreshnessResult struct { ProduceItem DaysUntilExpiry int `json:"days_until_expiry"` FreshnessScore int `json:"freshness_score"` // 1-10 DiscountPct float64 `json:"discount_pct"` DiscountedPrice float64 `json:"discounted_price"` Action string `json:"action"` // SELL/DISCOUNT/DONATE/REMOVE DisplayText string `json:"display_text"` } func checkFreshness(item ProduceItem) FreshnessResult { now := time.Now() expiry, _ := time.Parse(time.RFC3339, item.ExpiryDate) days := int(expiry.Sub(now).Hours() / 24) r := FreshnessResult{ProduceItem: item, DaysUntilExpiry: days} switch { case days <= 0: r.FreshnessScore, r.Action = 0, "REMOVE" r.DisplayText = "❌ Remove from shelf" case days == 1: r.FreshnessScore = 3 r.DiscountPct = 50 r.Action = "DISCOUNT" r.DiscountedPrice = item.BasePrice * 0.50 r.DisplayText = fmt.Sprintf("⚡ Flash: $%.2f (50%% off!)", r.DiscountedPrice) case days <= 3: r.FreshnessScore = 6 r.DiscountPct = 25 r.Action = "DISCOUNT" r.DiscountedPrice = item.BasePrice * 0.75 r.DisplayText = fmt.Sprintf("🏷️ Today's Deal: $%.2f", r.DiscountedPrice) default: r.FreshnessScore = 9 r.Action = "SELL" r.DiscountedPrice = item.BasePrice r.DisplayText = fmt.Sprintf("✅ Fresh! $%.2f", item.BasePrice) } return r } func main() { dec, enc := json.NewDecoder(os.Stdin), json.NewEncoder(os.Stdout) for { var item ProduceItem if err := dec.Decode(&item); err != nil { break } enc.Encode(checkFreshness(item)) } }
server.go — Go gRPC Microservice called by NiFi InvokeHTTP
package main import ( "encoding/json"; "log"; "net/http" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sns" ) type RecallRequest struct { SKU string `json:"sku"` StoreID string `json:"store_id"` Reason string `json:"reason"` Severity string `json:"severity"` } type RecallResponse struct { Success bool `json:"success"` SNSMsgID string `json:"sns_message_id"` SQSMsgID string `json:"sqs_message_id"` } // NiFi calls POST /recall-trigger with JSON body // This Go service fans out to BOTH SNS and SQS func recallHandler(w http.ResponseWriter, r *http.Request) { var req RecallRequest json.NewDecoder(r.Body).Decode(&req) cfg, _ := config.LoadDefaultConfig(r.Context()) sqsClient := sqs.NewFromConfig(cfg) snsClient := sns.NewFromConfig(cfg) // 1. Send to SQS (manager app queue) qURL := "https://sqs.us-east-1.amazonaws.com/123456/freshmart-urgent-alerts.fifo" body, _ := json.Marshal(req) sqsOut, _ := sqsClient.SendMessage(r.Context(), &sqs.SendMessageInput{ QueueUrl: &qURL, MessageBody: aws.String(string(body)), MessageGroupId: aws.String(req.StoreID), MessageDeduplicationId: aws.String(req.SKU + req.StoreID), }) // 2. Publish to SNS (broadcast to all) topicARN := "arn:aws:sns:us-east-1:123456:freshmart-product-recalls" snsOut, _ := snsClient.Publish(r.Context(), &sns.PublishInput{ TopicArn: &topicARN, Message: aws.String(string(body)), Subject: aws.String("RECALL: " + req.SKU), }) json.NewEncoder(w).Encode(RecallResponse{ Success: true, SNSMsgID: *snsOut.MessageId, SQSMsgID: *sqsOut.MessageId, }) } func main() { http.HandleFunc("/recall-trigger", recallHandler) http.HandleFunc("/health", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(200) }) log.Println("FreshMart Go service listening :8080") http.ListenAndServe(":8080", nil) }
📦 Go Packages for NiFi + AWS Work
☁️
aws-sdk-go-v2
github.com/aws/aws-sdk-go-v2
Official AWS SDK. Packages for SQS, SNS, S3, Lambda, DynamoDB.
🌐
net/http
Go stdlib HTTP server. Build the REST endpoint that NiFi's InvokeHTTP processor calls for custom logic.
📊
encoding/json
Go stdlib JSON encoder/decoder. Parse FlowFile content from stdin, write results to stdout. Zero dependencies!
📨
kafka-go
github.com/segmentio/kafka-go
Pure Go Kafka client. Build Go microservices that talk to Kafka without NiFi.
🗄️
database/sql
Go stdlib DB access + drivers for PostgreSQL (lib/pq) and IBM Db2. Look up product costs and categories at runtime.
🧪
testing + testify
github.com/stretchr/testify
Unit test your Go NiFi processors before deploying. Mock FlowFile input with stdin simulation.
Section 10 — Java Custom Processors

☕ Building Native NiFi Processors in Java

Java is NiFi's native language — a Java custom processor lives inside the NiFi JVM, gets full access to the FlowFile API, Controller Services, State Management, and the UI's Properties panel. It becomes a first-class citizen indistinguishable from the 300+ built-in processors.

Native NAR Processor

Packaged as a NiFi Archive (.nar) — NiFi's plugin format. Drop it in the lib/ folder, restart, and it appears in the Add Processor dialog like any built-in. Full UI, custom properties, all relationships.

🔌
Controller Service

Shared connection pools, credential providers, schema registries — a Controller Service is a reusable Java object that processors reference. Build a FreshMart vendor lookup service once, use it in 10 processors.

🗺️
Reporting Task

A Reporting Task runs on a schedule and has access to NiFi cluster-level metrics — queue depths, throughput, bulletins. Use it to push FreshMart pipeline health metrics to Prometheus or CloudWatch.

⚖️ Java Native NAR vs Go stdin/stdout — When to Use Each
Capability Java NAR Go stdin/stdout
Native NiFi UI properties✅ Full property descriptors, validators, EL❌ No UI properties
Access Controller Services✅ Direct Java API❌ Not directly
Multiple relationships✅ success, failure, retry, etc.⚠️ success/failure only
State management✅ Built-in local/cluster state❌ External state needed
Deployment⚠️ Maven build → .nar → restart✅ Copy binary, no restart
Dev speed⚠️ Slower (Maven, Java verbosity)✅ Fast iteration
Performance✅ No IPC overhead, JVM optimized⚠️ stdin/stdout IPC cost
Best forComplex routing, shared services, production enterprise useQuick business logic, microservice calls, AWS SDK operations
📁 Maven Multi-Module Project Layout
# freshmart-nifi-processors/ # Maven multi-module: one module per NAR freshmart-nifi-processors/ ├── pom.xml # parent POM │ ├── nifi-freshmart-processors/ # processor implementations │ ├── pom.xml │ └── src/main/java/com/freshmart/nifi/ │ ├── FreshMartBarcodeValidator.java │ ├── FreshMartMarginEnricher.java │ ├── FreshMartRecallRouter.java │ └── FreshMartVendorConnector.java │ ├── nifi-freshmart-services/ # controller services │ ├── pom.xml │ └── src/main/java/com/freshmart/nifi/ │ ├── FreshMartLookupService.java │ └── FreshMartLookupServiceImpl.java │ ├── nifi-freshmart-nar/ # packages both into .nar │ └── pom.xml │ └── nifi-freshmart-services-nar/ └── pom.xml # Build command: mvn clean package -DskipTests # Output: nifi-freshmart-nar/target/nifi-freshmart-nar-1.0.nar # Deploy: cp *.nar $NIFI_HOME/lib/ && ./bin/nifi.sh restart
<!-- parent pom.xml --> <project> <modelVersion>4.0.0</modelVersion> <groupId>com.freshmart.nifi</groupId> <artifactId>freshmart-nifi-processors</artifactId> <version>1.0.0</version> <packaging>pom</packaging> <properties> <nifi.version>1.24.0</nifi.version> <java.version>17</java.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> <version>${nifi.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-utils</artifactId> <version>${nifi.version}</version> </dependency> </dependencies> </dependencyManagement> <modules> <module>nifi-freshmart-processors</module> <module>nifi-freshmart-services</module> <module>nifi-freshmart-nar</module> </modules> </project>
FreshMartBarcodeValidator.java — validates vendor item codes → FM SKUs with full NiFi UI properties
package com.freshmart.nifi.processors; import org.apache.nifi.annotation.behavior.*; import org.apache.nifi.annotation.documentation.*; import org.apache.nifi.components.*; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.*; import org.apache.nifi.processor.io.*; import org.apache.nifi.processor.util.StandardValidators; import com.fasterxml.jackson.databind.*; import java.util.*; // Annotations appear in the NiFi UI tooltip and docs @Tags({"freshmart", "barcode", "validation", "vendor"}) @CapabilityDescription("Validates vendor item codes and translates them to " + "FreshMart internal SKU format. Routes valid items to 'valid', " + "unknown codes to 'unmatched', and parse errors to 'failure'.") @SupportsBatching // can process multiple FlowFiles per trigger @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @WritesAttributes({ @WritesAttribute(attribute="fm.sku", description="Translated FreshMart SKU"), @WritesAttribute(attribute="fm.barcode.type", description="Barcode format (FM-SKU-V2)"), @WritesAttribute(attribute="fm.vendor.code", description="Original vendor item code"), @WritesAttribute(attribute="fm.valid", description="true/false") }) public class FreshMartBarcodeValidator extends AbstractProcessor { // ── PROPERTY DESCRIPTORS (appear in NiFi UI Properties tab) ── static final PropertyDescriptor VENDOR_ID = new PropertyDescriptor.Builder() .name("vendor-id") .displayName("Vendor ID") .description("The vendor code (DOLE, DEL-MONTE, etc.) used to look up the correct SKU mapping table") .required(true) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); static final PropertyDescriptor STRICT_MODE = new PropertyDescriptor.Builder() .name("strict-mode") .displayName("Strict Validation Mode") .description("If true, route unknown codes to 'unmatched'. If false, pass them through with a warning attribute.") .required(true) .allowableValues("true", "false") .defaultValue("true") .build(); // ── RELATIONSHIPS (appear in NiFi UI Relationships tab) ── static final Relationship REL_VALID = new Relationship.Builder() .name("valid") .description("FlowFiles with a successfully translated FM SKU") .build(); static final Relationship REL_UNMATCHED = new Relationship.Builder() .name("unmatched") .description("Vendor code not found in the SKU mapping table") .build(); static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") .description("FlowFile could not be parsed or caused an exception") .build(); // Static SKU lookup map (in production: use a DatabaseRecordLookupService) private static final Map<String, String> SKU_MAP = Map.of( "DOLE-BAN-001", "BAN-CAVENDISH-1LB", "DOLE-STR-010", "STRAW-PINT-16OZ", "DOLE-PNA-002", "PINEAPPLE-EA", "DEL-APL-001", "APPL-GALA-1LB", "DEL-APL-002", "APPL-FUJI-1LB" ); private ObjectMapper mapper = new ObjectMapper(); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return List.of(VENDOR_ID, STRICT_MODE); } @Override public Set<Relationship> getRelationships() { return Set.of(REL_VALID, REL_UNMATCHED, REL_FAILURE); } @Override public void onTrigger(ProcessContext ctx, ProcessSession session) { FlowFile ff = session.get(); if (ff == null) return; try { // Read FlowFile content into a byte array String[] contentHolder = {null}; session.read(ff, in -> contentHolder[0] = new String(in.readAllBytes())); JsonNode node = mapper.readTree(contentHolder[0]); String itemCode = node.path("item_code").asText().toUpperCase(); String sku = SKU_MAP.get(itemCode); if (sku != null) { // Enrich the FlowFile with new attributes ff = session.putAttribute(ff, "fm.sku", sku); ff = session.putAttribute(ff, "fm.valid", "true"); ff = session.putAttribute(ff, "fm.barcode.type", "FM-SKU-V2"); ff = session.putAttribute(ff, "fm.vendor.code", itemCode); session.transfer(ff, REL_VALID); } else { ff = session.putAttribute(ff, "fm.valid", "false"); ff = session.putAttribute(ff, "fm.vendor.code", itemCode); ff = session.putAttribute(ff, "fm.error", "Unknown vendor code: " + itemCode); session.transfer(ff, REL_UNMATCHED); } } catch (Exception e) { getLogger().error("Failed to process FlowFile {}", ff, e); ff = session.penalize(ff); // back-off before retry session.transfer(ff, REL_FAILURE); } } }
FreshMartMarginEnricher.java — reads sale events, looks up cost from a shared Controller Service, writes enriched JSON back
package com.freshmart.nifi.processors; import org.apache.nifi.annotation.behavior.*; import org.apache.nifi.annotation.documentation.*; import org.apache.nifi.components.*; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.*; import com.fasterxml.jackson.databind.*; import com.fasterxml.jackson.databind.node.ObjectNode; import java.sql.*; import java.util.*; @Tags({"freshmart", "margin", "enrichment", "pricing"}) @CapabilityDescription("Enriches POS sale events with cost price, gross margin %, " + "and profit tier (HIGH/MED/LOW) via a live DB lookup. " + "Uses a shared DBCP Controller Service for connection pooling.") public class FreshMartMarginEnricher extends AbstractProcessor { // ── Reference a SHARED Controller Service (connection pool) ── static final PropertyDescriptor DB_SERVICE = new PropertyDescriptor.Builder() .name("database-connection-pool") .displayName("Database Connection Pool") .description("A DBCPConnectionPool controller service pointing to the " + "FreshMart product catalog database") .identifiesControllerService(DBCPService.class) .required(true) .build(); static final PropertyDescriptor HIGH_MARGIN_THRESHOLD = new PropertyDescriptor.Builder() .name("high-margin-threshold") .displayName("High Margin Threshold (%)") .description("Margin % above which items are classified as HIGH profit tier") .defaultValue("40") .addValidator(StandardValidators.NUMBER_VALIDATOR) .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success").description("Enriched with margin data").build(); static final Relationship REL_NO_COST = new Relationship.Builder() .name("no-cost-data").description("SKU not found in cost table").build(); static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure").description("DB error or parse failure").build(); private final ObjectMapper mapper = new ObjectMapper(); @Override public void onTrigger(ProcessContext ctx, ProcessSession session) { FlowFile ff = session.get(); if (ff == null) return; // Retrieve the shared Controller Service DBCPService dbService = ctx.getProperty(DB_SERVICE) .asControllerService(DBCPService.class); double highThreshold = ctx.getProperty(HIGH_MARGIN_THRESHOLD).asDouble(); try { // Read + parse FlowFile content String[] content = {null}; session.read(ff, in -> content[0] = new String(in.readAllBytes())); ObjectNode node = (ObjectNode) mapper.readTree(content[0]); String sku = node.path("sku").asText(); double salePrice = node.path("price").asDouble(); // DB lookup via the shared connection pool try (Connection conn = dbService.getConnection(); PreparedStatement ps = conn.prepareStatement( "SELECT unit_cost FROM product_costs WHERE sku=? AND active=true")) { ps.setString(1, sku); ResultSet rs = ps.executeQuery(); if (!rs.next()) { session.transfer(session.putAttribute(ff, "fm.margin.error", "No cost data for SKU: " + sku), REL_NO_COST); return; } double cost = rs.getDouble("unit_cost"); double marginPct = (salePrice - cost) / salePrice * 100; double marginUsd = Math.round((salePrice - cost) * 100.0) / 100.0; String tier = marginPct >= highThreshold ? "HIGH" : marginPct >= 20 ? "MED" : "LOW"; // Write enriched JSON back to FlowFile content node.put("cost_price", cost); node.put("margin_pct", Math.round(marginPct * 100.0) / 100.0); node.put("margin_usd", marginUsd); node.put("profit_tier", tier); byte[] enriched = mapper.writeValueAsBytes(node); ff = session.write(ff, out -> out.write(enriched)); ff = session.putAttribute(ff, "fm.profit.tier", tier); session.transfer(ff, REL_SUCCESS); } } catch (Exception e) { getLogger().error("Margin enrichment failed for {}", ff, e); session.transfer(session.penalize(ff), REL_FAILURE); } } }
FreshMartRecallRouter.java — stateful processor that counts complaints per SKU within a time window, triggers recalls automatically
package com.freshmart.nifi.processors; import org.apache.nifi.annotation.behavior.*; import org.apache.nifi.annotation.documentation.*; import org.apache.nifi.components.*; import org.apache.nifi.components.state.*; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.*; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.*; import java.util.concurrent.TimeUnit; @Tags({"freshmart", "recall", "complaints", "routing"}) @CapabilityDescription("Counts HIGH severity complaints per SKU within a " + "sliding time window using NiFi State Management. When a " + "threshold is reached, routes to the 'recall' relationship " + "exactly once. Uses cluster-scoped state for multi-node clusters.") @Stateful(scopes = {Scope.CLUSTER}, description = "Maintains complaint counts per SKU across nodes and restarts") public class FreshMartRecallRouter extends AbstractProcessor { static final PropertyDescriptor RECALL_THRESHOLD = new PropertyDescriptor.Builder() .name("recall-threshold") .displayName("Complaint Threshold for Recall") .description("Number of HIGH severity complaints within the window to trigger a recall") .defaultValue("3") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); static final PropertyDescriptor WINDOW_MINUTES = new PropertyDescriptor.Builder() .name("time-window-minutes") .displayName("Time Window (minutes)") .description("Complaints older than this window are not counted") .defaultValue("30") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); static final Relationship REL_RECALL = new Relationship.Builder() .name("recall") .description("Threshold reached — trigger a product recall flow").build(); static final Relationship REL_LOG = new Relationship.Builder() .name("log") .description("Below threshold — route to CRM complaint logging").build(); private final ObjectMapper mapper = new ObjectMapper(); @Override public void onTrigger(ProcessContext ctx, ProcessSession session) throws ProcessException { FlowFile ff = session.get(); if (ff == null) return; int threshold = ctx.getProperty(RECALL_THRESHOLD).asInteger(); long windowMs = TimeUnit.MINUTES.toMillis( ctx.getProperty(WINDOW_MINUTES).asLong()); try { String[] content = {null}; session.read(ff, in -> content[0] = new String(in.readAllBytes())); JsonNode node = mapper.readTree(content[0]); String sku = node.path("sku").asText(); String severity = node.path("severity").asText(); long now = System.currentTimeMillis(); if (!"HIGH".equals(severity)) { // Not HIGH — pass straight through to logging session.transfer(ff, REL_LOG); return; } // ── NiFi State Management (survives restarts, shared across cluster) ── StateManager stateManager = ctx.getStateManager(); Map<String, String> currentState = new HashMap<>( stateManager.getState(Scope.CLUSTER).toMap()); // State key: "sku:STRAW-PINT-16OZ:timestamps" → comma-separated epoch ms String tsKey = "sku:" + sku + ":timestamps"; String existing = currentState.getOrDefault(tsKey, ""); // Remove timestamps older than the window, add current List<Long> times = new ArrayList<>(); if (!existing.isEmpty()) { for (String t : existing.split(",")) if (now - Long.parseLong(t) <= windowMs) times.add(Long.parseLong(t)); } times.add(now); currentState.put(tsKey, times.stream().map(Object::toString) .reduce((""), (a, b) -> a.isEmpty() ? b : a + "," + b)); stateManager.setState(currentState, Scope.CLUSTER); ff = session.putAttribute(ff, "fm.complaint.count", "" + times.size()); ff = session.putAttribute(ff, "fm.recall.triggered", "" + (times.size() >= threshold)); session.transfer(ff, times.size() >= threshold ? REL_RECALL : REL_LOG); } catch (Exception e) { getLogger().error("RecallRouter failed for {}", ff, e); session.transfer(session.penalize(ff), REL_LOG); } } }
FreshMartVendorConnector.java — fetches vendor shipment status from external REST API, supports dynamic scheduling
package com.freshmart.nifi.processors; import org.apache.nifi.annotation.behavior.*; import org.apache.nifi.annotation.documentation.*; import org.apache.nifi.annotation.lifecycle.*; import org.apache.nifi.components.*; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.*; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.expression.ExpressionLanguageScope; import java.net.*; import java.net.http.*; import java.time.Duration; import java.util.*; @Tags({"freshmart", "vendor", "connector", "REST", "shipment"}) @CapabilityDescription("Polls a vendor's REST API for shipment status updates. " + "Handles authentication, pagination, and retry with exponential back-off. " + "Produces one FlowFile per shipment event with full metadata attributes.") @TriggerSerially // only one thread can run this at a time @PrimaryNodeOnly // only runs on the cluster's primary node public class FreshMartVendorConnector extends AbstractProcessor { static final PropertyDescriptor VENDOR_API_URL = new PropertyDescriptor.Builder() .name("vendor-api-url") .displayName("Vendor API Base URL") .description("REST endpoint for vendor shipment data, e.g. https://api.dole.com/v2/shipments") .required(true) .addValidator(StandardValidators.URL_VALIDATOR) .build(); static final PropertyDescriptor API_KEY = new PropertyDescriptor.Builder() .name("api-key") .displayName("API Key") .description("Vendor API authentication key. Stored securely in NiFi's sensitive properties.") .required(true) .sensitive(true) // ← NiFi masks this in the UI and encrypts at rest .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); static final PropertyDescriptor VENDOR_CODE = new PropertyDescriptor.Builder() .name("vendor-code") .displayName("Vendor Code") .description("FreshMart vendor identifier: DOLE, DEL-MONTE, CHIQUITA") .required(true) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() .name("request-timeout") .displayName("Request Timeout") .description("HTTP request timeout") .defaultValue("30 sec") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success").description("Shipment events fetched successfully").build(); static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure").description("API call failed").build(); private HttpClient httpClient; // @OnScheduled runs once when processor starts (not per FlowFile) @OnScheduled public void setup(ProcessContext ctx) { httpClient = HttpClient.newBuilder() .connectTimeout(Duration.ofSeconds(10)) .followRedirects(HttpClient.Redirect.NORMAL) .build(); getLogger().info("VendorConnector HTTP client initialized"); } @OnStopped public void cleanup() { httpClient = null; } @Override public void onTrigger(ProcessContext ctx, ProcessSession session) { String url = ctx.getProperty(VENDOR_API_URL).getValue(); String apiKey = ctx.getProperty(API_KEY).getValue(); String vendor = ctx.getProperty(VENDOR_CODE).getValue(); long timeoutMs = ctx.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); try { HttpRequest req = HttpRequest.newBuilder() .uri(URI.create(url + "?vendor=" + vendor + "&since=latest")) .header("X-API-Key", apiKey) .header("Accept", "application/json") .timeout(Duration.ofMillis(timeoutMs)) .GET() .build(); HttpResponse<String> resp = httpClient.send(req, HttpResponse.BodyHandlers.ofString()); if (resp.statusCode() != 200) { getLogger().warn("Vendor API returned status {}", resp.statusCode()); ctx.yield(); // back-off before next trigger return; } // Create one FlowFile with the full API response FlowFile ff = session.create(); final String body = resp.body(); ff = session.write(ff, out -> out.write(body.getBytes())); ff = session.putAttribute(ff, "vendor.code", vendor); ff = session.putAttribute(ff, "vendor.api.status", "" + resp.statusCode()); ff = session.putAttribute(ff, "mime.type", "application/json"); ff = session.putAttribute(ff, "fetch.timestamp", "" + System.currentTimeMillis()); session.transfer(ff, REL_SUCCESS); session.commitAsync(); // non-blocking commit } catch (Exception e) { getLogger().error("Vendor API call failed: {}", e.getMessage(), e); ctx.yield(); } } }
FreshMartLookupService.java — custom Controller Service interface + implementation shared across processors
// ── INTERFACE ───────────────────────────────── package com.freshmart.nifi.services; import org.apache.nifi.annotation.documentation.*; import org.apache.nifi.controller.ControllerService; import java.util.Optional; @Tags({"freshmart", "lookup", "vendor"}) @CapabilityDescription("Provides SKU-to-product lookup " + "and vendor code translation for FreshMart processors") public interface FreshMartLookupService extends ControllerService { /** Translate vendor item code to FM SKU */ Optional<String> lookupSKU(String vendorCode); /** Get product details by FM SKU */ Optional<ProductDetails> getProduct(String sku); /** Get unit cost for margin calculation */ Optional<Double> getUnitCost(String sku); /** True if the service cache is warmed up */ boolean isReady(); }
// ── IMPLEMENTATION ───────────────────────────── @Tags({"freshmart", "lookup", "cache"}) public class FreshMartLookupServiceImpl extends AbstractControllerService implements FreshMartLookupService { static final PropertyDescriptor JDBC_SERVICE = new PropertyDescriptor.Builder() .name("database-connection-pool") .identifiesControllerService(DBCPService.class) .required(true).build(); static final PropertyDescriptor CACHE_TTL = new PropertyDescriptor.Builder() .name("cache-ttl") .displayName("Cache TTL") .defaultValue("10 min") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); private volatile Map<String,String> skuCache; private volatile Map<String,Double> costCache; private volatile long cacheExpiry = 0; @OnEnabled // called when service is enabled in NiFi UI public void onEnabled(ConfigurationContext ctx) { warmCache(ctx); getLogger().info("FreshMart lookup service enabled, {} SKUs cached", skuCache.size()); } private synchronized void warmCache(ConfigurationContext ctx) { try (Connection c = ((DBCPService) ctx .getProperty(JDBC_SERVICE) .asControllerService()).getConnection(); Statement stmt = c.createStatement()) { Map<String,String> newSKU = new HashMap<>(); Map<String,Double> newCost = new HashMap<>(); ResultSet rs = stmt.executeQuery( "SELECT vendor_code, fm_sku, unit_cost " + "FROM product_costs WHERE active=true"); while (rs.next()) { newSKU.put(rs.getString(1), rs.getString(2)); newCost.put(rs.getString(2), rs.getDouble(3)); } skuCache = Collections.unmodifiableMap(newSKU); costCache = Collections.unmodifiableMap(newCost); cacheExpiry = System.currentTimeMillis() + 600_000; } catch (Exception e) { getLogger().error("Cache warm failed", e); } } @Override public Optional<String> lookupSKU(String vendorCode) { return Optional.ofNullable(skuCache.get(vendorCode.toUpperCase())); } @Override public Optional<Double> getUnitCost(String sku) { return Optional.ofNullable(costCache.get(sku)); } @Override public boolean isReady() { return skuCache != null; } }
FreshMartBarcodeValidatorTest.java — NiFi TestRunner framework, no running NiFi instance needed
package com.freshmart.nifi.processors; import org.apache.nifi.util.*; import org.junit.jupiter.api.*; import static org.junit.jupiter.api.Assertions.*; import java.nio.charset.StandardCharsets; import java.util.*; class FreshMartBarcodeValidatorTest { // NiFi's TestRunner emulates the entire processor lifecycle private TestRunner runner; @BeforeEach void setUp() { runner = TestRunners.newTestRunner(FreshMartBarcodeValidator.class); // Set property values just like the NiFi UI runner.setProperty(FreshMartBarcodeValidator.VENDOR_ID, "DOLE"); runner.setProperty(FreshMartBarcodeValidator.STRICT_MODE, "true"); } @Test void validDoleBananaCode_routesToValid() { String input = "{\"item_code\":\"DOLE-BAN-001\",\"qty\":120,\"unit_price\":0.32}"; // Enqueue a FlowFile with content runner.enqueue(input.getBytes(StandardCharsets.UTF_8)); runner.run(1); // Assert exactly 1 FlowFile was routed to 'valid' runner.assertTransferCount("valid", 1); runner.assertTransferCount("unmatched", 0); runner.assertTransferCount("failure", 0); // Check FlowFile attributes were set correctly MockFlowFile result = runner.getFlowFilesForRelationship("valid").get(0); result.assertAttributeEquals("fm.sku", "BAN-CAVENDISH-1LB"); result.assertAttributeEquals("fm.valid", "true"); result.assertAttributeEquals("fm.barcode.type", "FM-SKU-V2"); } @Test void unknownVendorCode_routesToUnmatched() { runner.enqueue("{\"item_code\":\"UNKNOWN-999\",\"qty\":5}".getBytes()); runner.run(1); runner.assertTransferCount("unmatched", 1); MockFlowFile result = runner.getFlowFilesForRelationship("unmatched").get(0); result.assertAttributeEquals("fm.valid", "false"); assertTrue(result.getAttribute("fm.error").contains("UNKNOWN-999")); } @Test void malformedJson_routesToFailure() { runner.enqueue("not valid json!".getBytes()); runner.run(1); runner.assertTransferCount("failure", 1); } @Test void batchProcessing_allItemsRouted() { for (String code : List.of("DOLE-BAN-001", "DOLE-STR-010", "DEL-APL-001")) { runner.enqueue(("{\"item_code\":\"" + code + "\"}").getBytes()); } runner.run(3); runner.assertTransferCount("valid", 3); runner.assertTransferCount("unmatched", 0); } // ── Run with: mvn test -pl nifi-freshmart-processors ── }
📄 nifi-freshmart-nar/pom.xml — Packages processors + services into a deployable .nar file
<!-- nifi-freshmart-nar/pom.xml --> <project> <parent> <groupId>com.freshmart.nifi</groupId> <artifactId>freshmart-nifi-processors</artifactId> <version>1.0.0</version> </parent> <artifactId>nifi-freshmart-nar</artifactId> <packaging>nar</packaging> <!-- Special Maven packaging --> <dependencies> <!-- Include processor module --> <dependency> <groupId>com.freshmart.nifi</groupId> <artifactId>nifi-freshmart-processors</artifactId> <version>${project.version}</version> </dependency> <!-- Include services module --> <dependency> <groupId>com.freshmart.nifi</groupId> <artifactId>nifi-freshmart-services</artifactId> <version>${project.version}</version> </dependency> <!-- Extend the standard NiFi NAR (gets all standard dependencies) --> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-standard-services-api-nar</artifactId> <version>${nifi.version}</version> <type>nar</type> </dependency> <!-- Jackson for JSON processing --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.16.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.nifi</groupId> <artifactId>nifi-nar-maven-plugin</artifactId> <version>1.5.1</version> <extensions>true</extensions> </plugin> </plugins> </build> </project>
📋 META-INF/services — Registers processors with NiFi's ServiceLoader
# nifi-freshmart-processors/src/main/resources/ # META-INF/services/ # org.apache.nifi.processor.Processor # # One fully-qualified class name per line: com.freshmart.nifi.processors.FreshMartBarcodeValidator com.freshmart.nifi.processors.FreshMartMarginEnricher com.freshmart.nifi.processors.FreshMartRecallRouter com.freshmart.nifi.processors.FreshMartVendorConnector # For Controller Services: # META-INF/services/ # org.apache.nifi.controller.ControllerService com.freshmart.nifi.services.FreshMartLookupServiceImpl # ── Build and install ────────────────────────── mvn clean package -DskipTests cp nifi-freshmart-nar/target/nifi-freshmart-nar-1.0.nar \ $NIFI_HOME/lib/ # NiFi must restart to pick up new NARs $NIFI_HOME/bin/nifi.sh restart # ── Verify in NiFi UI ────────────────────────── # Add Processor → search "FreshMart" # All 4 processors appear with full descriptions, # property panels, relationship tabs, and tooltips.
Section 11 — Administration

🛠️ NiFi Administration

Everything the NiFi Administrator is responsible for — installation, cluster setup, security, user management, monitoring, backup, and performance tuning — with real FreshMart commands and configs.

🎯
Core Admin Duties

What the NiFi Admin Owns

🖥️
Installation & Upgrades
Install NiFi, manage versions, perform rolling zero-downtime upgrades across the cluster.
🔐
Security & TLS
Generate keystores, configure LDAP/OIDC auth, manage RBAC policies, rotate certificates annually.
👥
Users & Groups
Create users/groups, assign access policies per component, scope store manager views.
📦
Flow Deployment
Manage NiFi Registry, promote flows dev→staging→prod, maintain version history.
📊
Monitoring & Alerting
Watch bulletins, queue depths, disk, JVM heap. Alert on-call when errors fire.
💾
Backup & Recovery
Daily config backups to S3, provenance archiving, DR drill each quarter.
Admin Task 1 — Installation

🚀 Installing NiFi (Docker & Linux)

DOCKER — DEV / DEMO
# Pull and start NiFi (single-user auth, quickest start) docker run --name freshmart-nifi \ -p 8443:8443 \ -e SINGLE_USER_CREDENTIALS_USERNAME=admin \ -e SINGLE_USER_CREDENTIALS_PASSWORD=FreshMart2024! \ -d apache/nifi:1.24.0 # Wait ~90 seconds, then open: open https://localhost:8443/nifi # Confirm started: docker logs freshmart-nifi | grep "NiFi has started"
LINUX BARE METAL — PRODUCTION
sudo apt-get install -y openjdk-21-jdk wget https://downloads.apache.org/nifi/1.24.0/nifi-1.24.0-bin.zip unzip nifi-1.24.0-bin.zip -d /opt/ && ln -s /opt/nifi-1.24.0 /opt/nifi # Set admin credentials /opt/nifi/bin/nifi.sh set-single-user-credentials admin FreshMart2024! # Register as systemd service & start sudo cp /opt/nifi/bin/nifi-service /etc/init.d/nifi sudo systemctl enable nifi && sudo systemctl start nifi tail -f /opt/nifi/logs/nifi-app.log
Admin Task 2 — Security

🔐 LDAP / Active Directory Auth

<!-- /opt/nifi/conf/login-identity-providers.xml --> <provider> <identifier>ldap-provider</identifier> <class>org.apache.nifi.ldap.LdapProvider</class> <property name="Authentication Strategy">SIMPLE</property> <property name="Manager DN">CN=svc-nifi,OU=ServiceAccounts,DC=freshmart,DC=internal</property> <property name="Manager Password">ServiceAcctPass!</property> <property name="Url">ldap://ad.freshmart.internal:389</property> <property name="User Search Base">OU=Users,DC=freshmart,DC=internal</property> <property name="User Search Filter">sAMAccountName={0}</property> <property name="Identity Strategy">USE_USERNAME</property> </provider>

👥 FreshMart RBAC Role Matrix

Role NiFi Permissions FreshMart Users
NiFi Admin All permissions. Create users, manage policies, install JARs alice@freshmart.com (Platform Eng)
Flow Developer View + Modify + Operate all Process Groups. Cannot manage users bob@freshmart.com, carol@freshmart.com
Flow Operator Start/Stop processors. View all. Cannot edit configs ops-team@freshmart.com (DevOps on-call)
Store Manager Viewer View only their store's Process Group. No configs, no data download store-managers AD group (50 users)
Data Analyst View canvas + provenance. Cannot download FlowFile content analytics-team AD group
Auditor Provenance only. No canvas, no configs, no content compliance@freshmart.com
Admin Task 3 — Monitoring

📊 What to Monitor & How to Alert

KEY METRICS TABLE
MetricAlert Threshold
Queue depth>80% of back-pressure limit
ERROR bulletinsAny ERROR = page on-call immediately
JVM heap>80% for 5+ min
DLQ depthfreshmart-dlq > 0 messages
Content repo disk70% full on /data/nifi/content
Active threadsMaxed out for 10+ min
BULLETIN CHECK SCRIPT (CRON EVERY 5 MIN)
TOKEN=$(curl -s -X POST $NIFI_URL/access/token \ -d "username=monitor&password=Pass!" \ -H "Content-Type: application/x-www-form-urlencoded") ERRORS=$(curl -s \ "$NIFI_URL/flow/bulletin-board?level=ERROR" \ -H "Authorization: Bearer $TOKEN") COUNT=$(echo $ERRORS | python3 -c \ "import sys,json b=json.load(sys.stdin) print(len(b['bulletinBoard']['bulletins']))") if [ "$COUNT" -gt "0" ]; then aws sqs send-message \ --queue-url $OPS_ALERTS_URL \ --message-body "{"source":"nifi","count":$COUNT}" fi
Admin Task 4 — Backup

💾 Backup & Restore

Back up after every deploy. Configs are small (<1MB) but losing them is catastrophic.

WhatPath
Flow config/opt/nifi/conf/flow.json.gz
Properties/opt/nifi/conf/nifi.properties
Users & Policiesusers.xml, authorizations.xml
Keystoreskeystore.jks, truststore.jks
Admin Task 5 — Performance

⚡ JVM & Thread Tuning

Tune for your workload. FreshMart uses medium sizing (1k–10k FlowFiles/sec).

# /opt/nifi/conf/jvm.options # Light (<1k/sec): -Xmx4g -Xms2g # Medium (1k-10k): -Xmx8g -Xms4g ← FM prod # Heavy (10k+/sec): -Xmx16g -Xms8g -Xmx8g -Xms4g -XX:+UseG1GC -XX:MaxGCPauseMillis=200 # nifi.properties — threads nifi.timer.driven.thread.pool.size=20 nifi.queue.swap.threshold=20000
Section 12 — User Roles

👥 NiFi Users — Roles, Personas & Daily Tasks

Five real FreshMart personas. Each user has different permissions and a completely different day-to-day experience with NiFi.

👩‍💻
Flow Developer
Builds & deploys pipelines
Modify + Operate
🔧
Flow Operator
Runs & monitors flows. On-call.
Operate only
📊
Data Analyst
Views canvas & provenance
View + Provenance
🏪
Store Manager
Views only their store group
Scoped View
📋
Auditor
Provenance records only
Provenance only
🛠️
NiFi Admin
All permissions. See Section 11.
All Access
👨‍💻
Persona 1 — Flow Developer

Bob Chen — Data Engineering

bchen@freshmart.com · Can build, deploy, and test flows · Cannot manage users

Bob builds FreshMart's data pipelines. He works in the dev NiFi environment and promotes to production via NiFi Registry. His Monday: build the Cold Chain Monitoring flow from scratch.

BOB'S TASK: BUILD COLD CHAIN MONITORING
1
GetHTTP— Configure URL: http://sensors.freshmart.internal/api/temps. Schedule: Timer Driven, every 2 min.
2
SplitRecord— CSVReader → JsonRecordSetWriter. Records per Split: 1. One sensor reading per FlowFile.
3
RouteOnAttribute— Property: too_warm = \${temperature:gt(40)}. Routes fridge failures immediately.
4
PutSQS (too_warm route)— Queue: freshmart-urgent-alerts.fifo. Message Group ID: \${sensor_id}
5
PutDatabaseRecord (all route)— UPSERT into COLD_CHAIN_LOG. Verify records appear in PostgreSQL.
6
Commit to RegistryRight-click Process Group → Version → Commit. Message: "Cold chain v1 — alerts >40°F"
Bob's Common Daily Tasks:
Debug failed processor — Right-click → View Status History. Find error spike time. Read bulletin for stack trace.
Inspect stuck queue — Right-click connection → List Queue. Click any FlowFile → view attributes + download content.
Test JoltTransform — Use jolt.demo.io or a debug flow: GetFile → JoltTransform → LogAttribute → PutFile. Iterate until output matches schema.
Add Parameter Context — Menu → Parameter Contexts → Add. Create: db.url, api.key, s3.bucket. Right-click PG → Configure → select context.
Set up Controller Service — Right-click canvas → Configure → Controller Services → Add DBCPConnectionPool. Enable before using in processors.
View status history — Right-click PG → View Status History. See bytes in/out per processor over last 24h to find bottlenecks.
🔧
Persona 2 — Flow Operator

Diana Reyes — DevOps On-Call

dreyes@freshmart.com · Start/Stop processors · Cannot edit configs or build flows

Diana keeps production flows running. She responds to PagerDuty alerts and restarts failed processors. Her 2 AM: "NiFi ERROR Bulletins Detected" fires — the PostgreSQL database crashed due to disk full.

DIANA'S 2AM INCIDENT RESPONSE
1
Log into NiFi UI. Click bell icon (bulletin board). Find: "PutDatabaseRecord — Connection refused: postgres:5432" at 2:17 AM.
2
Find the red Process Group: "Sales ETL". Right-click PutDatabaseRecord → View Status History → confirm failure start time 2:17 AM.
3
Stop the PutDatabaseRecord to halt error spam. SSH to postgres → sudo systemctl status postgresql → confirms disk full crash.
4
Escalate to DBA team. After DB recovery → right-click PutDatabaseRecord → Start. Watch queue depth decreasing on canvas.
5
Check DLQ: aws sqs get-queue-attributes --queue-name freshmart-dlq. Replay any failed FlowFiles via Menu → Provenance → Replay.
⚠️ Operator Rule: Diana has operate permission but NOT modify component. If a fix needs a config change (e.g. a new JDBC URL), she escalates to Bob who deploys via Registry. This ensures all changes are version-controlled.
📊
Persona 3 — Data Analyst

Emily Park — Analytics Team

Read-only canvas + provenance

Emily traces data quality issues back to their NiFi source. Her task: find out why Saturday sales numbers look wrong.

Emily's Investigation Steps:
1. View "Sales ETL" canvas → right-click GetFile → View Status History
2. Confirm: FlowFile rate = zero from 8:02–10:47 AM Saturday
3. Menu → Provenance → filter: PutDatabaseRecord, Saturday, Status=SUCCESS
4. Zero DB writes during that window → data gap confirmed → backfill request to dev team
🏪
Persona 4 — Store Manager

Marcus Williams — Store FM-042

Scoped to his store only

Marcus sees only the "Store FM-042 Status" Process Group. He can see green/red pipeline status but cannot click configs, view data, or see other stores.

How Admin Scoped Marcus's Access:
1. Create user: mwilliams@freshmart.com
2. Grant global: /flow READ (to log in)
3. Right-click "Store FM-042 Status" PG → Manage Access Policies → view component → add group "Store Managers FM-042" (READ only)
4. Do NOT add: modify, operate, view data, access controller
📋
Persona 5 — Compliance Auditor

Janet Okonkwo — Compliance Officer

jokonkwo@freshmart.com · Provenance records only · No canvas, no configs, no content download

During a food safety audit, Janet must prove that recalled product BAN-CAVENDISH-1LB was tracked through every system on 2024-11-01. NiFi provenance is the forensic trail.

JANET'S RECALL AUDIT — PROVENANCE TRAIL FOR BAN-CAVENDISH-1LB
Event Processor What Happened Time
RECEIVE GetSFTP Received DOLE_20241101.csv from vendors SFTP 06:14:23
FORK SplitRecord Batch split into 240 individual product FlowFiles 06:14:25
ATTR MOD LookupRecord Mapped DOLE-BAN-001 → sku=BAN-CAVENDISH-1LB 06:14:31
SEND PutDatabaseRecord Inserted into DW_SALES_FACT — 240 cases logged 06:14:51
SEND PublishKafka Published to vendor-deliveries Kafka topic 06:14:51
All Users — Troubleshooting

🔧 Common Problems & Fixes

Symptom Diagnosis & Fix
Processor shows INVALID (yellow)Right-click → Configure. Red fields = missing required properties. Check Controller Services are enabled and Parameter Context is assigned to this Process Group.
Queue growing, back-pressureDownstream too slow. Fix: (1) increase concurrent tasks on slow processor, (2) scale DB connection pool size, (3) reduce MergeRecord batch size to clear faster.
FlowFiles vanishing silentlyCheck auto-terminate relationships. Right-click processor → Configure → Relationships tab. Uncheck any relationship that should not be auto-terminated. Connect it to an error path.
ExecuteGroovyScript failsRead bulletin stack trace. Common: (1) missing import, (2) null FlowFile — add if (!ff) return, (3) groovy-all JAR not in /opt/nifi/lib/.
InvokeHTTP → 401 UnauthorizedAPI token expired. Update Parameter Context with new token. Use: Add Header: Authorization → Bearer #{api.token}
NiFi node shows DisconnectedCheck: (1) ZooKeeper running on all 3 nodes, (2) network ports 2181/11443/6342 open between nodes, (3) tail nifi-app.log on disconnected node for root cause.
JoltTransformJSON → empty outputJOLT spec error. Test at jolt.demo.io. Common: wrong shift path (field names are case-sensitive). Add LogAttribute downstream to print actual output.
PutDatabaseRecord → "column not found"Schema mismatch. Record Writer schema does not match table column names. Add an UpdateRecord step to rename fields, or update the Avro schema in the Record Writer.