사진과 음악을 좋아하는 개발자 지망생의 블로그

[kafka] KafkaProducer.flush() 본문

개발/카프카(Kafka)

[kafka] KafkaProducer.flush()

devculture309 2023. 7. 19. 10:52
반응형

flush()

- 이 메서드를 호출하면 버퍼에 있는 모든 레코드가 즉시 전송 가능 상태가 되며(linger_ms가 0보다 크더라도), 이러한 레코드와 관련된 요청이 완료될 때까지 블로킹됨.

- flush()는 이전에 전송된 레코드의 처리가 완료될 경우 실행된다(예: Future.is_done() == True).

- 요청은 성공 여부와 상관 없이 Producer의 'acks' 구성에 따른 전송 결과가 반환되면 완료된 것으로 간주됨.

- 한 스레드가 flush 호출이 완료될 때까지 블로킹되는 동안 다른 스레드는 메시지를 계속해서 전송할 수 있지만, flush 호출이 시작된 이후에 전송된 메시지의 완료에 대해서는 보장할 수 없음

 

매개변수: timeout (float, 선택 사항) – 완료를 기다리기 위해 제공된 시간 제한(초)

예외 발생: KafkaTimeoutError – 제공된 제한 시간 내에 버퍼에 있는 레코드를 플러시하지 못한 경우 발생

 

producer.py

from time import sleep
from json import dumps
from kafka import KafkaProducer

# Producer 생성
producer = KafkaProducer(
   bootstrap_servers=['localhost:9092'],
   value_serializer=lambda x: dumps(x).encode('utf-8')
)

# Producer와 Broker간 연결 확인
print(producer.bootstrap_connected())

# 전송할 데이터('hellow world')
data = 'hello word'
producer.send('purchase', value=data)

# 데이터 전송 요청
producer.flush()

consumer.py

from kafka import KafkaConsumer
from json import loads
from time import sleep

# consumer 생성
consumer = KafkaConsumer(
   'purchase',
   bootstrap_servers=['localhost:9092'],
   auto_offset_reset='earliest',
   enable_auto_commit=True,
   value_deserializer=lambda x: loads(x.decode('utf-8'))
)

# Producer로부터 온 메세지 출력
for msg in consumer:
    print(msg)	# 객체 출력
    print(msg.value)	# 내용 출력

 

 

반응형

'개발 > 카프카(Kafka)' 카테고리의 다른 글

[Kafka] Message Processing guarantees  (2) 2023.07.17
[Kafka] 카프카 기초지식 정리  (0) 2023.07.15