103
104func closeBody(rc io.ReadCloser) {
105 if rc != nil {
106 if _, err := io.Copy(io.Discard, rc); err != nil {107 log.Error(err)108 }109 if err := rc.Close(); err != nil {110 log.Error(err)111 }112 }
113}
2050 }
2051 return nil
2052 })
2053 case updated.Load() == 0 && aeCount.Load() > 0:2054 err = errors.ErrSameVectorAlreadyExists(uuid, vec, vec)2055 err = status.WrapWithAlreadyExists(vald.UpdateRPCName+" API update target same vector already exists", err, reqInfo, resInfo)2056 if span != nil {2057 span.RecordError(err)2058 span.SetAttributes(trace.StatusCodeAlreadyExists(err.Error())...)2059 span.SetStatus(trace.StatusError, err.Error())2060 }2061 return nil, err2062
2063 }
2064 slices.Sort(ls)
2041 }
2042 return err
2043 }
2044 if loc != nil {2045 updated.Add(1)2046 mu.Lock()2047 locs.Ips = append(locs.GetIps(), loc.GetIps()...)2048 ls = append(ls, loc.GetName())2049 mu.Unlock()2050 }2051 return nil2052 })
2053 case updated.Load() == 0 && aeCount.Load() > 0:
2054 err = errors.ErrSameVectorAlreadyExists(uuid, vec, vec)
2024 span.End()
2025 }
2026 }()
2027 loc, err := vc.Insert(ctx, &payload.Insert_Request{2028 Vector: req.GetVector(),2029 Config: &payload.Insert_Config{2030 SkipStrictExistCheck: true,2031 Filters: req.GetConfig().GetFilters(),2032 Timestamp: req.GetConfig().GetTimestamp(),2033 },2034 }, copts...)2035 if err != nil {2036 st, ok := status.FromError(err)2037 if ok && st != nil && span != nil {2038 span.RecordError(err)2039 span.SetAttributes(trace.FromGRPCStatus(st.Code(), fmt.Sprintf("Shortage index Insert for Update operation for Agent %s failed,\terror: %v", target, err))...)2040 span.SetStatus(trace.StatusError, err.Error())2041 }2042 return err2043 }
2044 if loc != nil {
2045 updated.Add(1)
1990 }
1991 return nil
1992 })
1993 switch {1994 case err != nil:1995 st, msg, err := status.ParseError(err, codes.Internal,1996 "failed to parse "+vald.UpdateRPCName+" gRPC error response", reqInfo, resInfo, info.Get())1997 if span != nil {1998 span.RecordError(err)1999 span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)2000 span.SetStatus(trace.StatusError, err.Error())2001 }2002 return nil, err2003 case len(locs.Ips) <= 0:2004 err = errors.ErrIndexNotFound2005 err = status.WrapWithNotFound(vald.UpdateRPCName+" API update target not found", err, reqInfo, resInfo)2006 if span != nil {2007 span.RecordError(err)2008 span.SetAttributes(trace.StatusCodeNotFound(err.Error())...)2009 span.SetStatus(trace.StatusError, err.Error())2010 }2011 return nil, err2012 case updated.Load()+aeCount.Load() < uint64(s.replica):2013 shortage := s.replica - int(updated.Load()+aeCount.Load())2014 err = s.gateway.DoMulti(ctx, shortage, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) {2015 mu.RLock()2016 tf, ok := visited[target]2017 mu.RUnlock()2018 if tf && ok {2019 return errors.Errorf("target: %s already inserted will skip", target)2020 }2021 ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "DoMulti/"+target), apiName+"/"+vald.InsertRPCName+"/"+target)2022 defer func() {2023 if span != nil {2024 span.End()2025 }2026 }()
2027 loc, err := vc.Insert(ctx, &payload.Insert_Request{
2028 Vector: req.GetVector(),
A source line is considered covered when at least one instruction that is assigned to this line has been executed by a test case. These lines were not executed during any of the test cases.