![[KAFKA] Azure Event Hubs로 Kafka 스트리밍 구현: Producer와 Consumer 코드 예제 및 설정 방법](/static/4087eff491a762f51181e1990de1523b/744d4/eventhub.png)
$ conda install -c conda-forge python-confluent-kafka
from confluent_kafka import Producer
import sys
if __name__ == '__main__':
if len(sys.argv) != 2:
sys.stderr.write('Usage: %s <topic>\n' % sys.argv[0])
sys.exit(1)
topic = sys.argv[1]
conf = {
'bootstrap.servers': '<EventHub Namespace Name>.servicebus.windows.net:9093', # 여기에 생성한 EventHub의 NameSpace으로 변환하시면 됩니다.
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': '<SAS Connection Endpoint URL>', # 여기에 바로 위에서 발급받은 SAS EndPoint를 입력합니다.
'client.id': 'nasa1515-producer'
}
# Create Producer instance
p = Producer(**conf)
# fail check def
def delivery_callback(err, msg):
if err:
sys.stderr.write('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write('%% Message delivered to %s [%d] @ %o\n' % (msg.topic(), msg.partition(), msg.offset()))
# Write 1-100 to topic
for i in range(0, 1000): # 저는 Range로 0~1000 까지의 숫자로 문자열을 생성해서 게시 했습니다.
try:
p.produce(topic, 'Kafka_data_nasa1515-' + str(i), callback=delivery_callback)
except BufferError as e:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p))
p.poll(0)
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()
$ python3 producer.py <Topic Name> ... ... python3 /home/nasa1515/docker/producer/Azure/producer.py nasatopic -> 생성 할 Topic Name
(kafka) nasa1515@L-wslee:~$ python3 /home/nasa1515/docker/producer/Azure/producer.py nasatopic % Waiting for 1000 deliveries % Message delivered to nasatopic [0] @ 0 % Message delivered to nasatopic [0] @ 1 % Message delivered to nasatopic [0] @ 2 % Message delivered to nasatopic [0] @ 3 % Message delivered to nasatopic [0] @ 4 % Message delivered to nasatopic [0] @ 5 % Message delivered to nasatopic [0] @ 6 % Message delivered to nasatopic [0] @ 7 % Message delivered to nasatopic [0] @ 10 % Message delivered to nasatopic [0] @ 11 % Message delivered to nasatopic [0] @ 12 % Message delivered to nasatopic [0] @ 13 % Message delivered to nasatopic [0] @ 14 % Message delivered to nasatopic [0] @ 15 % Message delivered to nasatopic [0] @ 16 % Message delivered to nasatopic [0] @ 17 % Message delivered to nasatopic [0] @ 20 ....
from confluent_kafka import Consumer, KafkaException, KafkaError
import sys
import getopt
import json
import logging
import pandas as pandas
from pprint import pformat
def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))
def print_usage_and_exit(program_name):
sys.stderr.write('Usage: %s [options..] <consumer-group> <topic1> <topic2> ..\n' % program_name)
options = '''
Options:
-T <intvl> Enable client statistics at specified interval (ms)
'''
sys.stderr.write(options)
sys.exit(1)
if __name__ == '__main__':
optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
if len(argv) < 2:
print_usage_and_exit(sys.argv[0])
group = argv[0]
topics = argv[1:]
conf = {
'bootstrap.servers': '<Your NameSpace Name>.servicebus.windows.net:9093', # NameSpace Name 입력해줍니다.
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': '<SAS Token Endpoint URL>', # Endpoint Url 입력
'group.id': group,
'client.id': '<Cumstom>',
'request.timeout.ms': 60000,
'session.timeout.ms': 60000,
'default.topic.config': {'auto.offset.reset': 'smallest'}
}
# Check to see if -T option exists
for opt in optlist:
if opt[0] != '-T':
continue
try:
intval = int(opt[1])
except ValueError:
sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
sys.exit(1)
if intval <= 0:
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
sys.exit(1)
conf['stats_cb'] = stats_cb
conf['statistics.interval.ms'] = int(opt[1])
# Create logger for consumer (logs will be emitted when poll() is called)
logger = logging.getLogger('consumer')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
logger.addHandler(handler)
# Create Consumer instance
# Hint: try debug='fetch' to generate some log messages
c = Consumer(conf, logger=logger)
def print_assignment(consumer, partitions):
print('Assignment:', partitions)
# Subscribe to topics
c.subscribe(topics, on_assign=print_assignment)
# Read messages from Kafka, print to stdout
try:
while True:
msg = c.poll(timeout=100.0)
if msg is None:
continue
if msg.error():
# Error or event
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
else:
# Error
raise KafkaException(msg.error())
else:
# Proper message
print(msg.value())
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
finally:
# Close down consumer to commit final offsets.
c.close()
$ python3 producer.py <Comsumer_GROUP_ID> <TOPIC_NAME_1> <TOPIC_NAME_2> ... ... ... python3 /home/nasa1515/docker/producer/Azure/Consum.py $Default nasatopic nasatopic
2022-05-12 10:11:59,390 WARNING CONFWARN [nasa1515-consumer#consumer-1] [thrd:app]: Configuration property request.timeout.ms is a producer property and will be ignored by this consumer instance
Assignment: [TopicPartition{topic=nasatopic,partition=0,offset=-1001,error=None}]
b'Kafka_data_nasa1515-0'
b'Kafka_data_nasa1515-1'
b'Kafka_data_nasa1515-2'
b'Kafka_data_nasa1515-3'
b'Kafka_data_nasa1515-4'
b'Kafka_data_nasa1515-5'
b'Kafka_data_nasa1515-6'
b'Kafka_data_nasa1515-7'
b'Kafka_data_nasa1515-8'
b'Kafka_data_nasa1515-9'
b'Kafka_data_nasa1515-10'
b'Kafka_data_nasa1515-11'
b'Kafka_data_nasa1515-12'
...
...

import json
from google.auth import jwt
from concurrent import futures
from google.cloud import pubsub_v1
service_account_info = json.load(open("/home/nasa1515/docker/producer/GCP/data-cloocus-ffd800735dd1.json"))
credentials_pub = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
credentials = jwt.Credentials.from_service_account_info(
service_account_info, audience=credentials_pub
)
publisher = pubsub_v1.PublisherClient(credentials=credentials)
project_id = "data-cloocus"
topic_id = "pubsub_nasa1515"
topic_path = publisher.topic_path(project_id, topic_id)
for n in range(1, 100):
data_str = f"nasa1515_Pubsub_Massage : {n}"
data = data_str.encode("utf-8")
future = publisher.publish(topic_path, data)
print(future.result())
print(f"Published messages to {topic_path}.")
import os
import json
from google.auth import jwt
from google.cloud import pubsub_v1
service_account_info = json.load(open("/home/nasa1515/docker/producer/GCP/data-cloocus-ffd800735dd1.json"))
credentials_sub = "https://pubsub.googleapis.com/google.pubsub.v1.Subscriber"
credentials = jwt.Credentials.from_service_account_info(
service_account_info, audience=credentials_sub
)
subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
project_id = "data-cloocus"
topic_id = "pubsub_nasa1515"
subscription = "pubsub_nasa1515-sub"
topic_name = f'projects/{project_id}/topics/{topic_id}'
subscription_name = f'projects/{project_id}/subscriptions/{subscription}'
def callback(message):
print(message.data)
message.ack()
with pubsub_v1.SubscriberClient() as subscriber:
try:
response = subscriber.get_subscription(subscription=subscription_name)
print(response)
except:
subscriber.create_subscription(
name=subscription_name, topic=topic_name)
future = subscriber.subscribe(subscription_name, callback)
else:
future = subscriber.subscribe(subscription_name, callback)
try:
future.result()
except KeyboardInterrupt:
future.cancel()