gRPC Streaming API Tutorial: Server, Client, and Bidirectional Patterns with Go and Python
Hands-on gRPC streaming tutorial: server, client, and bidirectional streams with Go and Python, plus proto design, flow control, deadlines, security, and testing.
Image used for representation purposes only.
Overview
gRPC streaming lets clients and servers exchange a continuous sequence of messages over a single HTTP/2 connection. Instead of waiting for a single request/response (unary), streams keep the channel open so either side can send more data as it becomes available. This pattern is ideal for chat, telemetry, IoT, real-time analytics dashboards, and any workflow that benefits from low-latency, incremental delivery.
Key advantages:
- Strongly typed messages via Protocol Buffers (Protobuf)
- HTTP/2 multiplexing, header compression, and flow control
- Bidirectional communication and backpressure-aware APIs
- Cross-language support (Go, Python, Java, Node.js, C#, and more)
Note: Browsers need gRPC-Web or a proxy because native gRPC requires HTTP/2 with certain features that aren’t universally exposed by browsers.
Streaming patterns in gRPC
- Unary: one request, one response. Simple RPCs like GetUser.
- Server streaming: one request, many responses. Example: tailing logs.
- Client streaming: many requests, one response. Example: uploading a series of metrics.
- Bidirectional streaming: many requests, many responses. Example: chat or live collaboration.
How to choose:
- If the server produces a feed of updates: server streaming.
- If the client uploads a sequence for aggregation: client streaming.
- If both sides need interactive, low-latency exchange: bidirectional.
Designing the protocol (telemetry example)
We’ll design a simple telemetry service that demonstrates all three streaming modes.
syntax = "proto3";
package telemetry.v1;
option go_package = "example.com/telemetry/gen/telemetry/v1;telemetryv1";
message TailRequest {
string service = 1; // which service to tail logs from
int64 since_unix = 2; // start time (epoch secs)
}
message LogLine {
int64 ts_unix = 1;
string level = 2;
string message = 3;
}
message Metric {
string name = 1;
double value = 2;
int64 ts_unix = 3;
}
message UploadAck {
int64 count = 1; // how many metrics processed
}
message ChatMessage {
string user = 1;
string body = 2;
int64 sent_unix = 3;
}
service Telemetry {
rpc TailLogs (TailRequest) returns (stream LogLine); // server-streaming
rpc UploadMetrics (stream Metric) returns (UploadAck); // client-streaming
rpc Chat (stream ChatMessage) returns (stream ChatMessage); // bidi-streaming
}
Generating code
- Protobuf compiler: install
protoc. - Go plugins:
go install google.golang.org/protobuf/cmd/protoc-gen-go@latestandgo install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest. - Python tools:
python -m pip install grpcio grpcio-tools.
Commands:
# Go
protoc -I. --go_out=. --go-grpc_out=. telemetry.proto
# Python
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. telemetry.proto
Implementing a bidirectional stream in Go (server)
package main
import (
"context"
"errors"
"io"
"log"
"net"
telemetryv1 "example.com/telemetry/gen/telemetry/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
)
type server struct{
telemetryv1.UnimplementedTelemetryServer
}
func (s *server) Chat(stream telemetryv1.Telemetry_ChatServer) error {
ctx := stream.Context()
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if errors.Is(err, context.Canceled) || status.Code(err) == codes.Canceled {
return nil
}
if err != nil {
return status.Errorf(codes.Internal, "recv error: %v", err)
}
// echo with a small prefix
reply := &telemetryv1.ChatMessage{
User: "server",
Body: "ack: " + in.Body,
SentUnix: in.SentUnix,
}
if err := stream.Send(reply); err != nil {
if status.Code(err) == codes.Canceled {
return nil
}
return status.Errorf(codes.Unavailable, "send error: %v", err)
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil { log.Fatal(err) }
// For production, use TLS credentials instead of insecure.
s := grpc.NewServer(grpc.Creds(insecure.NewCredentials()))
telemetryv1.RegisterTelemetryServer(s, &server{})
log.Println("gRPC server on :50051")
if err := s.Serve(lis); err != nil { log.Fatal(err) }
}
Implementing a bidirectional client in Go
package main
import (
"bufio"
"context"
"log"
"os"
"time"
telemetryv1 "example.com/telemetry/gen/telemetry/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil { log.Fatal(err) }
defer conn.Close()
c := telemetryv1.NewTelemetryClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
stream, err := c.Chat(ctx)
if err != nil { log.Fatal(err) }
// Receiver goroutine
done := make(chan struct{})
go func() {
defer close(done)
for {
in, err := stream.Recv()
if err != nil {
log.Println("recv:", err)
return
}
log.Printf("%s: %s\n", in.User, in.Body)
}
}()
// Send a few lines from stdin
scanner := bufio.NewScanner(os.Stdin)
log.Println("Type messages, Ctrl-D to end")
for scanner.Scan() {
msg := &telemetryv1.ChatMessage{User: "you", Body: scanner.Text(), SentUnix: time.Now().Unix()}
if err := stream.Send(msg); err != nil {
log.Println("send:", err)
break
}
}
stream.CloseSend()
<-done
}
Server-streaming client in Python (async)
import asyncio
import grpc
from telemetry.v1 import telemetry_pb2, telemetry_pb2_grpc
async def tail_logs():
async with grpc.aio.insecure_channel("localhost:50051") as channel:
stub = telemetry_pb2_grpc.TelemetryStub(channel)
req = telemetry_pb2.TailRequest(service="orders", since_unix=0)
call = stub.TailLogs(req, timeout=20.0)
async for line in call:
print(f"{line.ts_unix} {line.level}: {line.message}")
if __name__ == "__main__":
asyncio.run(tail_logs())
Flow control, backpressure, and message sizing
gRPC rides on HTTP/2’s flow control. Practical tips:
- Treat
Recv()as a natural backpressure signal. If the receiver is slow, the sender’s window shrinks andSend()eventually blocks or errors. - Prefer many small messages to a single huge one. Chunk large payloads (e.g., files) into slices and include sequence numbers.
- Enable compression for large textual payloads (e.g., gzip); avoid for already-compressed data.
- Keep protobuf messages stable and versioned; add fields with new tags and avoid renumbering existing fields.
- Tune max message sizes on both client and server if you legitimately need bigger frames.
Deadlines, cancellation, retries
- Always set deadlines on RPCs. In Go, use
context.WithTimeout; in Python, passtimeout=. - Check
ctx.Done()in server loops and return promptly when canceled. - Implement idempotency keys before enabling automatic retries; retried streams can otherwise duplicate effects.
- For long-lived streams, consider heartbeats or application-level pings to detect half-open connections.
Example (Go, 10s deadline):
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.Chat(ctx)
Keepalive and connectivity
- Use gRPC keepalive pings to detect broken NATs or idle timeouts.
- For Go servers,
keepaliveServerParameters/EnforcementPolicy can constrain overly aggressive clients. - Reconnect with exponential backoff on the client; most gRPC clients include sensible defaults.
Security: TLS, mTLS, and auth
- Use TLS in production. Terminate at the service or a sidecar that preserves HTTP/2.
- For zero-trust environments, enable mutual TLS (mTLS) so both client and server authenticate using certificates.
- Propagate identity/authorization via per-RPC metadata (e.g., OAuth2 bearer tokens) and enforce on the server.
- Validate message payloads. Streaming makes it easy to send too much data; enforce quotas and per-stream limits.
Observability and operations
- Interceptors/middleware: add logging, auth checks, and tracing without polluting handlers.
- Tracing: instrument with OpenTelemetry; trace a stream as a span with events per message or per batch.
- Metrics: record active streams, messages sent/received, bytes in/out, and latency per message.
- Structured logs: include RPC method, peer address, and auth subject when useful.
Testing and debugging
- grpcurl: interact with servers and read server streams without writing a client.
- ghz/fortio: load test streaming RPCs; measure throughput and tail latencies.
- Reflection: enable server reflection in non-production environments to explore services quickly.
- Fault injection: test cancellations, slow consumers, and network partitions.
Example (grpcurl tailing logs):
grpcurl -plaintext \
-d '{"service":"orders","since_unix":0}' \
localhost:50051 telemetry.v1.Telemetry/TailLogs
Common pitfalls and how to avoid them
- Forgetting deadlines: streams run forever and leak resources. Always set timeouts or at least periodic heartbeats.
- Ignoring context: check
ctx.Err()and stop work immediately on cancel. - Over-large messages: prefer chunking; tune message limits conservatively.
- Silent drops by proxies: configure keepalives and server timeouts to outlast intermediaries or send app-level heartbeats.
- Backpressure bugs: never spin a tight send loop; respect
Send()errors and block until the receiver catches up.
Production checklist
- Security: TLS/mTLS on by default; rotate certs; enforce authz per method.
- Reliability: deadlines, retries (idempotent only), exponential backoff, health checks.
- Performance: chunk large payloads, compress where helpful, tune flow control and message limits.
- Observability: tracing, metrics, structured logs, error categorization.
- Developer experience: reflection in staging, golden tests for protobuf changes, backward-compatible field evolution.
Where to go next
- Add paging or filters to
TailLogsto support multiple consumers. - Use bidirectional
Chatas a foundation for command/response control planes. - Integrate OpenTelemetry for traces and metrics, then use a dashboard to watch per-stream throughput and errors in real time.
By designing your Protobuf schema for evolution, respecting deadlines and backpressure, and investing in security and observability up front, you’ll ship gRPC streaming APIs that are fast, reliable, and a pleasure to maintain.
Related Posts
gRPC Microservices Tutorial: Build, Secure, and Observe a Production-Ready API in Go
Step-by-step gRPC microservices tutorial: Protobuf design, Go services, TLS/mTLS, deadlines, retries, streaming, observability, Docker, and Kubernetes.
React AI Chatbot Tutorial: Build a Streaming Chat UI with OpenAI and Node.js
Build a streaming React AI chatbot with a secure Node proxy using OpenAI’s Responses API. Code, SSE streaming, model tips, and production guidance.
Building a Production-Ready RAG System with LlamaIndex: A Hands-On Tutorial
Step-by-step LlamaIndex RAG tutorial: ingestion, indexing, reranking, citations, persistence, evaluation, and deployment with a FastAPI service.