Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 | 31 |
Tags
- 프로그래머스
- AWS
- 데브코스
- 에어플로우
- 개념정리
- 데이터엔지니어
- 파이썬
- airflow
- 데이터웨어하우스
- 개발
- 자료구조
- 운영체제
- SQL
- 관계형데이터베이스
- 기술면접
- 웹크롤링
- 취준
- DataWarehouse
- Amazon
- 웹자동화
- WEB
- Service
- 부트캠프
- 클라우드
- 알고리즘
- CS
- 웹스크래핑
- 데이터베이스
- 데이터엔지니어링
- Django
Archives
- Today
- Total
사진과 음악을 좋아하는 개발자 지망생의 블로그
[kafka] KafkaProducer.flush() 본문
반응형
flush()
- 이 메서드를 호출하면 버퍼에 있는 모든 레코드가 즉시 전송 가능 상태가 되며(linger_ms가 0보다 크더라도), 이러한 레코드와 관련된 요청이 완료될 때까지 블로킹됨.
- flush()는 이전에 전송된 레코드의 처리가 완료될 경우 실행된다(예: Future.is_done() == True).
- 요청은 성공 여부와 상관 없이 Producer의 'acks' 구성에 따른 전송 결과가 반환되면 완료된 것으로 간주됨.
- 한 스레드가 flush 호출이 완료될 때까지 블로킹되는 동안 다른 스레드는 메시지를 계속해서 전송할 수 있지만, flush 호출이 시작된 이후에 전송된 메시지의 완료에 대해서는 보장할 수 없음
매개변수: timeout (float, 선택 사항) – 완료를 기다리기 위해 제공된 시간 제한(초)
예외 발생: KafkaTimeoutError – 제공된 제한 시간 내에 버퍼에 있는 레코드를 플러시하지 못한 경우 발생
producer.py
from time import sleep
from json import dumps
from kafka import KafkaProducer
# Producer 생성
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
# Producer와 Broker간 연결 확인
print(producer.bootstrap_connected())
# 전송할 데이터('hellow world')
data = 'hello word'
producer.send('purchase', value=data)
# 데이터 전송 요청
producer.flush()
consumer.py
from kafka import KafkaConsumer
from json import loads
from time import sleep
# consumer 생성
consumer = KafkaConsumer(
'purchase',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
# Producer로부터 온 메세지 출력
for msg in consumer:
print(msg) # 객체 출력
print(msg.value) # 내용 출력
반응형
'개발 > 카프카(Kafka)' 카테고리의 다른 글
[Kafka] Message Processing guarantees (2) | 2023.07.17 |
---|---|
[Kafka] 카프카 기초지식 정리 (0) | 2023.07.15 |