diff options
Diffstat (limited to 'vendor/cloud.google.com/go/logging/logging.go')
| -rw-r--r-- | vendor/cloud.google.com/go/logging/logging.go | 399 |
1 files changed, 231 insertions, 168 deletions
diff --git a/vendor/cloud.google.com/go/logging/logging.go b/vendor/cloud.google.com/go/logging/logging.go index 81ede6afb..4f26a1934 100644 --- a/vendor/cloud.google.com/go/logging/logging.go +++ b/vendor/cloud.google.com/go/logging/logging.go @@ -30,16 +30,17 @@ import ( "encoding/json" "errors" "fmt" + "io" "log" "net/http" "regexp" + "runtime" "strconv" "strings" "sync" "time" "unicode/utf8" - "cloud.google.com/go/internal/version" vkit "cloud.google.com/go/logging/apiv2" "cloud.google.com/go/logging/internal" "github.com/golang/protobuf/proto" @@ -50,6 +51,8 @@ import ( mrpb "google.golang.org/genproto/googleapis/api/monitoredres" logtypepb "google.golang.org/genproto/googleapis/logging/type" logpb "google.golang.org/genproto/googleapis/logging/v2" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" ) const ( @@ -75,7 +78,7 @@ const ( DefaultEntryCountThreshold = 1000 // DefaultEntryByteThreshold is the default value for the EntryByteThreshold LoggerOption. - DefaultEntryByteThreshold = 1 << 20 // 1MiB + DefaultEntryByteThreshold = 1 << 23 // 8MiB // DefaultBufferedByteLimit is the default value for the BufferedByteLimit LoggerOption. DefaultBufferedByteLimit = 1 << 30 // 1GiB @@ -87,16 +90,22 @@ const ( defaultWriteTimeout = 10 * time.Minute ) -// For testing: -var now = time.Now +var ( + // ErrRedirectProtoPayloadNotSupported is returned when Logger is configured to redirect output and + // tries to redirect logs with protobuf payload. + ErrRedirectProtoPayloadNotSupported = errors.New("printEntryToStdout: cannot find valid payload") -// ErrOverflow signals that the number of buffered entries for a Logger -// exceeds its BufferLimit. -var ErrOverflow = bundler.ErrOverflow + // For testing: + now = time.Now -// ErrOversizedEntry signals that an entry's size exceeds the maximum number of -// bytes that will be sent in a single call to the logging service. -var ErrOversizedEntry = bundler.ErrOversizedItem + // ErrOverflow signals that the number of buffered entries for a Logger + // exceeds its BufferLimit. + ErrOverflow = bundler.ErrOverflow + + // ErrOversizedEntry signals that an entry's size exceeds the maximum number of + // bytes that will be sent in a single call to the logging service. + ErrOversizedEntry = bundler.ErrOversizedItem +) // Client is a Logging client. A Client is associated with a single Cloud project. type Client struct { @@ -135,7 +144,10 @@ type Client struct { // By default NewClient uses WriteScope. To use a different scope, call // NewClient using a WithScopes option (see https://godoc.org/google.golang.org/api/option#WithScopes). func NewClient(ctx context.Context, parent string, opts ...option.ClientOption) (*Client, error) { - parent = makeParent(parent) + parent, err := makeParent(parent) + if err != nil { + return nil, err + } opts = append([]option.ClientOption{ option.WithScopes(WriteScope), }, opts...) @@ -143,7 +155,7 @@ func NewClient(ctx context.Context, parent string, opts ...option.ClientOption) if err != nil { return nil, err } - c.SetGoogleClientInfo("gccl", version.Repo) + c.SetGoogleClientInfo("gccl", internal.Version) client := &Client{ client: c, parent: parent, @@ -168,11 +180,15 @@ func NewClient(ctx context.Context, parent string, opts ...option.ClientOption) return client, nil } -func makeParent(parent string) string { +func makeParent(parent string) (string, error) { if !strings.ContainsRune(parent, '/') { - return "projects/" + parent + return "projects/" + parent, nil + } + prefix := strings.Split(parent, "/")[0] + if prefix != "projects" && prefix != "folders" && prefix != "billingAccounts" && prefix != "organizations" { + return parent, fmt.Errorf("parent parameter must start with 'projects/' 'folders/' 'billingAccounts/' or 'organizations/'") } - return parent + return parent, nil } // Ping reports whether the client's connection to the logging service and the @@ -230,114 +246,14 @@ type Logger struct { bundler *bundler.Bundler // Options - commonResource *mrpb.MonitoredResource - commonLabels map[string]string - ctxFunc func() (context.Context, func()) -} - -// A LoggerOption is a configuration option for a Logger. -type LoggerOption interface { - set(*Logger) -} - -// CommonLabels are labels that apply to all log entries written from a Logger, -// so that you don't have to repeat them in each log entry's Labels field. If -// any of the log entries contains a (key, value) with the same key that is in -// CommonLabels, then the entry's (key, value) overrides the one in -// CommonLabels. -func CommonLabels(m map[string]string) LoggerOption { return commonLabels(m) } - -type commonLabels map[string]string - -func (c commonLabels) set(l *Logger) { l.commonLabels = c } - -// ConcurrentWriteLimit determines how many goroutines will send log entries to the -// underlying service. The default is 1. Set ConcurrentWriteLimit to a higher value to -// increase throughput. -func ConcurrentWriteLimit(n int) LoggerOption { return concurrentWriteLimit(n) } - -type concurrentWriteLimit int - -func (c concurrentWriteLimit) set(l *Logger) { l.bundler.HandlerLimit = int(c) } - -// DelayThreshold is the maximum amount of time that an entry should remain -// buffered in memory before a call to the logging service is triggered. Larger -// values of DelayThreshold will generally result in fewer calls to the logging -// service, while increasing the risk that log entries will be lost if the -// process crashes. -// The default is DefaultDelayThreshold. -func DelayThreshold(d time.Duration) LoggerOption { return delayThreshold(d) } - -type delayThreshold time.Duration - -func (d delayThreshold) set(l *Logger) { l.bundler.DelayThreshold = time.Duration(d) } - -// EntryCountThreshold is the maximum number of entries that will be buffered -// in memory before a call to the logging service is triggered. Larger values -// will generally result in fewer calls to the logging service, while -// increasing both memory consumption and the risk that log entries will be -// lost if the process crashes. -// The default is DefaultEntryCountThreshold. -func EntryCountThreshold(n int) LoggerOption { return entryCountThreshold(n) } - -type entryCountThreshold int - -func (e entryCountThreshold) set(l *Logger) { l.bundler.BundleCountThreshold = int(e) } - -// EntryByteThreshold is the maximum number of bytes of entries that will be -// buffered in memory before a call to the logging service is triggered. See -// EntryCountThreshold for a discussion of the tradeoffs involved in setting -// this option. -// The default is DefaultEntryByteThreshold. -func EntryByteThreshold(n int) LoggerOption { return entryByteThreshold(n) } - -type entryByteThreshold int - -func (e entryByteThreshold) set(l *Logger) { l.bundler.BundleByteThreshold = int(e) } - -// EntryByteLimit is the maximum number of bytes of entries that will be sent -// in a single call to the logging service. ErrOversizedEntry is returned if an -// entry exceeds EntryByteLimit. This option limits the size of a single RPC -// payload, to account for network or service issues with large RPCs. If -// EntryByteLimit is smaller than EntryByteThreshold, the latter has no effect. -// The default is zero, meaning there is no limit. -func EntryByteLimit(n int) LoggerOption { return entryByteLimit(n) } - -type entryByteLimit int - -func (e entryByteLimit) set(l *Logger) { l.bundler.BundleByteLimit = int(e) } - -// BufferedByteLimit is the maximum number of bytes that the Logger will keep -// in memory before returning ErrOverflow. This option limits the total memory -// consumption of the Logger (but note that each Logger has its own, separate -// limit). It is possible to reach BufferedByteLimit even if it is larger than -// EntryByteThreshold or EntryByteLimit, because calls triggered by the latter -// two options may be enqueued (and hence occupying memory) while new log -// entries are being added. -// The default is DefaultBufferedByteLimit. -func BufferedByteLimit(n int) LoggerOption { return bufferedByteLimit(n) } - -type bufferedByteLimit int - -func (b bufferedByteLimit) set(l *Logger) { l.bundler.BufferedByteLimit = int(b) } - -// ContextFunc is a function that will be called to obtain a context.Context for the -// WriteLogEntries RPC executed in the background for calls to Logger.Log. The -// default is a function that always returns context.Background. The second return -// value of the function is a function to call after the RPC completes. -// -// The function is not used for calls to Logger.LogSync, since the caller can pass -// in the context directly. -// -// This option is EXPERIMENTAL. It may be changed or removed. -func ContextFunc(f func() (ctx context.Context, afterCall func())) LoggerOption { - return contextFunc(f) + commonResource *mrpb.MonitoredResource + commonLabels map[string]string + ctxFunc func() (context.Context, func()) + populateSourceLocation int + partialSuccess bool + redirectOutputWriter io.Writer } -type contextFunc func() (ctx context.Context, afterCall func()) - -func (c contextFunc) set(l *Logger) { l.ctxFunc = c } - // Logger returns a Logger that will write entries with the given log ID, such as // "syslog". A log ID must be less than 512 characters long and can only // include the following characters: upper and lower case alphanumeric @@ -349,10 +265,13 @@ func (c *Client) Logger(logID string, opts ...LoggerOption) *Logger { r = monitoredResource(c.parent) } l := &Logger{ - client: c, - logName: internal.LogPath(c.parent, logID), - commonResource: r, - ctxFunc: func() (context.Context, func()) { return context.Background(), nil }, + client: c, + logName: internal.LogPath(c.parent, logID), + commonResource: r, + ctxFunc: func() (context.Context, func()) { return context.Background(), nil }, + populateSourceLocation: DoNotPopulateSourceLocation, + partialSuccess: false, + redirectOutputWriter: nil, } l.bundler = bundler.NewBundler(&logpb.LogEntry{}, func(entries interface{}) { l.writeLogEntries(entries.([]*logpb.LogEntry)) @@ -464,10 +383,14 @@ func (v Severity) String() string { // Severity. func (v *Severity) UnmarshalJSON(data []byte) error { var s string - if err := json.Unmarshal(data, &s); err != nil { - return err + var i int + if strErr := json.Unmarshal(data, &s); strErr == nil { + *v = ParseSeverity(s) + } else if intErr := json.Unmarshal(data, &i); intErr == nil { + *v = Severity(i) + } else { + return fmt.Errorf("%v; %v", strErr, intErr) } - *v = ParseSeverity(s) return nil } @@ -705,28 +628,52 @@ func jsonValueToStructValue(v interface{}) *structpb.Value { // and will block, it is intended primarily for debugging or critical errors. // Prefer Log for most uses. func (l *Logger) LogSync(ctx context.Context, e Entry) error { - ent, err := toLogEntryInternal(e, l.client, l.client.parent) + ent, err := toLogEntryInternal(e, l, l.client.parent, 1) if err != nil { return err } + entries, hasInstrumentation := l.instrumentLogs([]*logpb.LogEntry{ent}) + if l.redirectOutputWriter != nil { + for _, ent = range entries { + err = serializeEntryToWriter(ent, l.redirectOutputWriter) + if err != nil { + break + } + } + return err + } _, err = l.client.client.WriteLogEntries(ctx, &logpb.WriteLogEntriesRequest{ - LogName: l.logName, - Resource: l.commonResource, - Labels: l.commonLabels, - Entries: []*logpb.LogEntry{ent}, + LogName: l.logName, + Resource: l.commonResource, + Labels: l.commonLabels, + Entries: entries, + PartialSuccess: l.partialSuccess || hasInstrumentation, }) return err } // Log buffers the Entry for output to the logging service. It never blocks. func (l *Logger) Log(e Entry) { - ent, err := toLogEntryInternal(e, l.client, l.client.parent) + ent, err := toLogEntryInternal(e, l, l.client.parent, 1) if err != nil { l.client.error(err) return } - if err := l.bundler.Add(ent, proto.Size(ent)); err != nil { - l.client.error(err) + + entries, _ := l.instrumentLogs([]*logpb.LogEntry{ent}) + if l.redirectOutputWriter != nil { + for _, ent = range entries { + err = serializeEntryToWriter(ent, l.redirectOutputWriter) + if err != nil { + l.client.error(err) + } + } + return + } + for _, ent = range entries { + if err := l.bundler.Add(ent, proto.Size(ent)); err != nil { + l.client.error(err) + } } } @@ -742,11 +689,16 @@ func (l *Logger) Flush() error { } func (l *Logger) writeLogEntries(entries []*logpb.LogEntry) { + partialSuccess := l.partialSuccess + if len(entries) > 1 { + partialSuccess = partialSuccess || hasInstrumentation(entries) + } req := &logpb.WriteLogEntriesRequest{ - LogName: l.logName, - Resource: l.commonResource, - Labels: l.commonLabels, - Entries: entries, + LogName: l.logName, + Resource: l.commonResource, + Labels: l.commonLabels, + Entries: entries, + PartialSuccess: partialSuccess, } ctx, afterCall := l.ctxFunc() ctx, cancel := context.WithTimeout(ctx, defaultWriteTimeout) @@ -767,7 +719,59 @@ func (l *Logger) writeLogEntries(entries []*logpb.LogEntry) { // (for example by calling SetFlags or SetPrefix). func (l *Logger) StandardLogger(s Severity) *log.Logger { return l.stdLoggers[s] } -var reCloudTraceContext = regexp.MustCompile( +func populateTraceInfo(e *Entry, req *http.Request) bool { + if req == nil { + if e.HTTPRequest != nil && e.HTTPRequest.Request != nil { + req = e.HTTPRequest.Request + } else { + return false + } + } + header := req.Header.Get("Traceparent") + if header != "" { + // do not use traceSampled flag defined by traceparent because + // flag's definition differs from expected by Cloud Tracing + traceID, spanID, _ := deconstructTraceParent(header) + if traceID != "" { + e.Trace = traceID + e.SpanID = spanID + return true + } + } + header = req.Header.Get("X-Cloud-Trace-Context") + if header != "" { + traceID, spanID, traceSampled := deconstructXCloudTraceContext(header) + if traceID != "" { + e.Trace = traceID + e.SpanID = spanID + // enforce sampling if required + e.TraceSampled = e.TraceSampled || traceSampled + return true + } + } + return false +} + +// As per format described at https://www.w3.org/TR/trace-context/#traceparent-header-field-values +var validTraceParentExpression = regexp.MustCompile(`^(00)-([a-fA-F\d]{32})-([a-f\d]{16})-([a-fA-F\d]{2})$`) + +func deconstructTraceParent(s string) (traceID, spanID string, traceSampled bool) { + matches := validTraceParentExpression.FindStringSubmatch(s) + if matches != nil { + // regexp package does not support negative lookahead preventing all 0 validations + if matches[2] == "00000000000000000000000000000000" || matches[3] == "0000000000000000" { + return + } + flags, err := strconv.ParseInt(matches[4], 16, 16) + if err == nil { + traceSampled = (flags & 0x01) == 1 + } + traceID, spanID = matches[2], matches[3] + } + return +} + +var validXCloudTraceContext = regexp.MustCompile( // Matches on "TRACE_ID" `([a-f\d]+)?` + // Matches on "/SPAN_ID" @@ -785,9 +789,11 @@ func deconstructXCloudTraceContext(s string) (traceID, spanID string, traceSampl // * traceID (optional): "105445aa7843bc8bf206b120001000" // * spanID (optional): "1" // * traceSampled (optional): true - matches := reCloudTraceContext.FindStringSubmatch(s) + matches := validXCloudTraceContext.FindStringSubmatch(s) - traceID, spanID, traceSampled = matches[1], matches[2], matches[3] == "1" + if matches != nil { + traceID, spanID, traceSampled = matches[1], matches[2], matches[3] == "1" + } if spanID == "0" { spanID = "" @@ -811,11 +817,20 @@ func deconstructXCloudTraceContext(s string) (traceID, spanID string, traceSampl // Logger.LogSync are used, it is intended to be used together with direct call // to WriteLogEntries method. func ToLogEntry(e Entry, parent string) (*logpb.LogEntry, error) { - // We have this method to support logging agents that need a bigger flexibility. - return toLogEntryInternal(e, nil, makeParent(parent)) + var l Logger + return l.ToLogEntry(e, parent) +} + +// ToLogEntry for Logger instance +func (l *Logger) ToLogEntry(e Entry, parent string) (*logpb.LogEntry, error) { + parent, err := makeParent(parent) + if err != nil { + return nil, err + } + return toLogEntryInternal(e, l, parent, 1) } -func toLogEntryInternal(e Entry, client *Client, parent string) (*logpb.LogEntry, error) { +func toLogEntryInternal(e Entry, l *Logger, parent string, skipLevels int) (*logpb.LogEntry, error) { if e.LogName != "" { return nil, errors.New("logging: Entry.LogName should be not be set when writing") } @@ -823,33 +838,34 @@ func toLogEntryInternal(e Entry, client *Client, parent string) (*logpb.LogEntry if t.IsZero() { t = now() } - ts, err := ptypes.TimestampProto(t) - if err != nil { - return nil, err - } - if e.Trace == "" && e.HTTPRequest != nil && e.HTTPRequest.Request != nil { - traceHeader := e.HTTPRequest.Request.Header.Get("X-Cloud-Trace-Context") - if traceHeader != "" { - // Set to a relative resource name, as described at - // https://cloud.google.com/appengine/docs/flexible/go/writing-application-logs. - traceID, spanID, traceSampled := deconstructXCloudTraceContext(traceHeader) - if traceID != "" { - e.Trace = fmt.Sprintf("%s/traces/%s", parent, traceID) - } - if e.SpanID == "" { - e.SpanID = spanID + ts := timestamppb.New(t) + if l != nil && l.populateSourceLocation != DoNotPopulateSourceLocation && e.SourceLocation == nil { + if l.populateSourceLocation == AlwaysPopulateSourceLocation || + l.populateSourceLocation == PopulateSourceLocationForDebugEntries && e.Severity == Severity(Debug) { + // filename and line are captured for source code that calls + // skipLevels up the goroutine calling stack + 1 for this func. + pc, file, line, ok := runtime.Caller(skipLevels + 1) + if ok { + details := runtime.FuncForPC(pc) + e.SourceLocation = &logpb.LogEntrySourceLocation{ + File: file, + Function: details.Name(), + Line: int64(line), + } } - - // If we previously hadn't set TraceSampled, let's retrieve it - // from the HTTP request's header, as per: - // https://cloud.google.com/trace/docs/troubleshooting#force-trace - e.TraceSampled = e.TraceSampled || traceSampled + } + } + if e.Trace == "" { + populateTraceInfo(&e, nil) + // format trace + if e.Trace != "" && !strings.Contains(e.Trace, "/traces/") { + e.Trace = fmt.Sprintf("%s/traces/%s", parent, e.Trace) } } req, err := fromHTTPRequest(e.HTTPRequest) if err != nil { - if client != nil { - client.error(err) + if l != nil && l.client != nil { + l.client.error(err) } else { return nil, err } @@ -870,6 +886,8 @@ func toLogEntryInternal(e Entry, client *Client, parent string) (*logpb.LogEntry switch p := e.Payload.(type) { case string: ent.Payload = &logpb.LogEntry_TextPayload{TextPayload: p} + case *anypb.Any: + ent.Payload = &logpb.LogEntry_ProtoPayload{ProtoPayload: p} default: s, err := toProtoStruct(p) if err != nil { @@ -879,3 +897,48 @@ func toLogEntryInternal(e Entry, client *Client, parent string) (*logpb.LogEntry } return ent, nil } + +// entry represents the fields of a logging.Entry that can be parsed by Logging agent. +// See the mappings at https://cloud.google.com/logging/docs/structured-logging#special-payload-fields +type structuredLogEntry struct { + // JsonMessage map[string]interface{} `json:"message,omitempty"` + // TextMessage string `json:"message,omitempty"` + Message json.RawMessage `json:"message"` + Severity string `json:"severity,omitempty"` + HTTPRequest *logtypepb.HttpRequest `json:"httpRequest,omitempty"` + Timestamp string `json:"timestamp,omitempty"` + Labels map[string]string `json:"logging.googleapis.com/labels,omitempty"` + InsertID string `json:"logging.googleapis.com/insertId,omitempty"` + Operation *logpb.LogEntryOperation `json:"logging.googleapis.com/operation,omitempty"` + SourceLocation *logpb.LogEntrySourceLocation `json:"logging.googleapis.com/sourceLocation,omitempty"` + SpanID string `json:"logging.googleapis.com/spanId,omitempty"` + Trace string `json:"logging.googleapis.com/trace,omitempty"` + TraceSampled bool `json:"logging.googleapis.com/trace_sampled,omitempty"` +} + +func serializeEntryToWriter(entry *logpb.LogEntry, w io.Writer) error { + jsonifiedEntry := structuredLogEntry{ + Severity: entry.Severity.String(), + HTTPRequest: entry.HttpRequest, + Timestamp: entry.Timestamp.String(), + Labels: entry.Labels, + InsertID: entry.InsertId, + Operation: entry.Operation, + SourceLocation: entry.SourceLocation, + SpanID: entry.SpanId, + Trace: entry.Trace, + TraceSampled: entry.TraceSampled, + } + var err error + if entry.GetTextPayload() != "" { + jsonifiedEntry.Message, err = json.Marshal(entry.GetTextPayload()) + } else if entry.GetJsonPayload() != nil { + jsonifiedEntry.Message, err = json.Marshal(entry.GetJsonPayload().AsMap()) + } else { + return ErrRedirectProtoPayloadNotSupported + } + if err == nil { + err = json.NewEncoder(w).Encode(jsonifiedEntry) + } + return err +} |
