;; EXAMPLE: Storing an object in a state map to control it's lifecycle
(def state (ref {}))
(defn start->streams
[]
(log/info "[start->streams] enter")
;; deref state and do something with it (in this case, check if we have created it)
(when (not (instance? KafkaStreams (:streams @state)))
(let [config-file (s/conform ::configs/configuration-file (configs/config CONFIG_PATH))
kafka-config (s/conform ::configs/kafka-configuration (:processor.config/kafka-configuration config-file))
stream-processing-props {StreamsConfig/APPLICATION_ID_CONFIG (:applicationid kafka-config)
StreamsConfig/COMMIT_INTERVAL_MS_CONFIG (:auto.commit.interval.ms kafka-config)
StreamsConfig/BOOTSTRAP_SERVERS_CONFIG (:bootstrap-servers kafka-config)
StreamsConfig/DEFAULT_KEY_SERDE_CLASS_CONFIG (.getName (.getClass (Serdes/String)))
StreamsConfig/DEFAULT_VALUE_SERDE_CLASS_CONFIG (.getName (.getClass (Serdes/String)))
StreamsConfig/PROCESSING_GUARANTEE_CONFIG StreamsConfig/EXACTLY_ONCE}]
(try
(log/infof "[start->streams] creating kafka stream with config: %s" stream-processing-props)
(dosync
;; update the ref and store an object in the state map
(alter state conj (-> { :streams (KafkaStreams. (topology) (StreamsConfig. stream-processing-props))})))
(log/info "[start->streams] stream created")
(catch Exception e (log/error e)))))
;; deref the state and call an fn on the contained object
(.start (:streams @state)))