Monday, March 12, 2012

Issue with SqlUserDefinedAggregate

I am using the code below but I am getting a "zero" result for dbo.AggredIssue('Test') user defined aggregate everytime that the query executes parallel processing and uses the "Merge" method. It seems that my private variable "private List<string> myList" gets nullified everytime it goes through the "Merge".

I saw other people reporting the same issue in other forums, but nobody was able to provide a solution or explanation.

See below a simplified version of my code (posted just after the queries) that replicates the issue.

The query below works because it does't process the query in parallel.

SELECT GroupID, dbo.AggregIssue('Test')

FROM MyTable

where fund = 2

group by GroupID

The query below doesn't work because it process the query in parallel.

SELECT GroupID, dbo.AggregIssue('Test')

FROM MyTable

where fund <= 20

group by GroupID

[Serializable]

[SqlUserDefinedAggregate(

Format.UserDefined,

IsInvariantToNulls = true,

IsInvariantToDuplicates = false,

IsInvariantToOrder = true,

MaxByteSize = 1000)]

public class AggregIssue : IBinarySerialize {

private List<string> myList;

private int myResult;

public void Init() {

myList = new List<string>();

}

public void Accumulate(SqlString Value) {

if (Value.IsNull) { return; }

myList.Add(Value.ToString());

}

public void Merge(AggregIssue Other) {

if (Other.myList != null) {

if (myList == null) {

myList = Other.myList;

}

else {

myList.AddRange(Other.myList);

}

}

}

public SqlInt32 Terminate() {

return new SqlInt32(myResult);

}

public void Read(BinaryReader r) {

myResult = r.ReadInt32();

}

//The code below is simplified for posting in the forum.

//I do additional manipulation of the list and require

//the aggregation to be IBinarySerialize.

//But this code replicates the issue also

public void Write(BinaryWriter w) {

w.Write(myList.Count);

}

}

? If you want to maintain the list properly you should serialize/deserialize the list itself -- and not its count -- in the Read and Write methods. They may be called more than one during the course of aggregation. Your current code is probably failing because it's making assumptions about how and when these will be called. You should instead do something like: public void Write(BinaryWriter w) { w.Write(myList.Count); foreach (string theString in myList) w.Write(theString); } public void Read(BinaryReader r) { this.myList = new List<string>(); int numStrings = r.ReadInt32(); for (int i = 0; i<numStrings; i++) myList.Add(r.ReadString()); } ... Then, in your Terminate method: public SqlInt32 Terminate() { return new SqlInt32(myList.Count); } -- Adam MachanicPro SQL Server 2005, available nowhttp://www..apress.com/book/bookDisplay.html?bID=457-- <FernandoT@.discussions..microsoft.com> wrote in message news:e4178255-6dd9-4b48-b5ae-eb072c88e314_WBRev3_@.discussions..microsoft.com...This post has been edited either by the author or a moderator in the Microsoft Forums: http://forums.microsoft.com I am using the code below but I am getting a "zero" result for dbo.AggredIssue('Test') user defined aggregate everytime that the query executes parallel processing and uses the "Merge" method. It seems that my private variable "private List<string> myList" gets nullified everytime it goes through the "Merge". I saw other people reporting the same issue in other forums, but nobody was able to provide a solution or explanation. See below a simplified version of my code (posted just after the queries) that replicates the issue. The query below works because it does't process the query in parallel. SELECT GroupID, dbo.AggregIssue('Test') FROM MyTable where fund = 2 group by GroupID The query below doesn't work because it process the query in parallel. SELECT GroupID, dbo.AggregIssue('Test') FROM MyTable where fund <= 20 group by GroupID [Serializable] [SqlUserDefinedAggregate( Format.UserDefined, IsInvariantToNulls = true, IsInvariantToDuplicates = false, IsInvariantToOrder = true, MaxByteSize = 1000)] public class AggregIssue : IBinarySerialize { private List<string> myList; private int myResult; public void Init() { myList = new List<string>(); } public void Accumulate(SqlString Value) { if (Value.IsNull) { return; } myList.Add(Value.ToString()); } public void Merge(AggregIssue Other) { if (Other.myList != null) { if (myList == null) { myList = Other.myList; } else { myList.AddRange(Other.myList); } } } public SqlInt32 Terminate() { return new SqlInt32(myResult); } public void Read(BinaryReader r) { myResult = r.ReadInt32(); } //The code below is simplified for posting in the forum. //I do additional manipulation of the list and require //the aggregation to be IBinarySerialize. //But this code replicates the issue also public void Write(BinaryWriter w) { w.Write(myList.Count); } }|||

Hi Adam,

Thank you very much for your response.

The reason that I don't serialize the list itself is because of the MaxByteSize limitation of 8096. My list will easily go beyond that limitation. I test your solution and it works for a small list, but not for long ones.

I noticed that the list is loosing the information in the merge. If the query doesn't go through parallel threads, it works fine.

My assumption is that the serialization will happen before terminate and final output of aggregate value for each row of the result set.

Any other ideas?

Thanks!!

Fernando

|||Hi, Fernando,

I think Adam is right about you didn't serialize the List. Other people on the forum is talking about "break the 8k boundary" stuff, don't know what conclusion they have right now. But since you are using a List to join strings together, you always facing the problem.

About Merge() stuff, to my limited understanding:

If single thread (no parallel op):

Init() -> Accumulate() -> Terminate()

If multi threads (parallel plan):

T1: Init() -> Accumulate()
T2: Init() -> Accumulate() -> Merge( with T1)
T3: Init() -> Accumulate() -> Merge( with T3) -> Terminate()

This is only to illustrate the way Merge() works. Any T can be reused during this, thus why Init() must clean everything.

In Adam's book Chapter 6, p. 195, I quote:

"It is important to understand when dealing with aggregates that the intermediate result will be serialized and deserialized once per row of aggregated data. Therefore, it is imperative for performance that serialization and deserialization be as efficient as possible"

I'm a bit confused here, could Adam give some explanation for this paragraph? What's the relationship between Merge() and once per row of aggregated data?

Regards,

Dong Xie
|||? Actually, that quote from the book is not quite accurate anymore -- when I wrote it against an earlier CTP it appeared to be true, but now it does not. I believe that it has been changed in a later printing of the book, but I'll check today with Apress to make sure. Thanks for pointing it out! Anyway, the correct phrasing at this point should be "the intermediate result can be serialized and deserialized up to one time per row of aggregated data" In other words, the SQL Server engine may not do it for every row (or even nearly that often in most cases), but you need to code your aggregate as if it will. There is not necessarily a direct/stated relationship between Merge and Read/Write, but it appears that when Merge is called the engine also does a round of serialization/deserialization. I'm not sure why, though. Hopefully Steven Hemingray or one of the other dataworks guys will show up in this thread and clarify! -- Adam MachanicPro SQL Server 2005, available nowhttp://www..apress.com/book/bookDisplay.html?bID=457-- <Dong Xie@.discussions.microsoft.com> wrote in message news:c5fafeee-262b-43a4-9295-10a7cb07d92a@.discussions.microsoft.com...Hi, Fernando,I think Adam is right about you didn't serialize the List. Other people on the forum is talking about "break the 8k boundary" stuff, don't know what conclusion they have right now. But since you are using a List to join strings together, you always facing the problem.About Merge() stuff, to my limited understanding:If single thread (no parallel op):Init() -> Accumulate() -> Terminate()If multi threads (parallel plan):T1: Init() -> Accumulate()T2: Init() -> Accumulate() -> Merge( with T1)T3: Init() -> Accumulate() -> Merge( with T3) -> Terminate()This is only to illustrate the way Merge() works. Any T can be reused during this, thus why Init() must clean everything.In Adam's book Chapter 6, p. 195, I quote:"It is important to understand when dealing with aggregates that the intermediate result will be serialized and deserialized once per row of aggregated data. Therefore, it is imperative for performance that serialization and deserialization be as efficient as possible"I'm a bit confused here, could Adam give some explanation for this paragraph? What's the relationship between Merge() and once per row of aggregated data?Regards,Dong Xie|||

It seems that you are right and serialization/deserialization happens when merge is called.

For now I have a workaround that was suggested in another forum to use "MAXDOP=1" and it works as merge is never executed.

It is not ideal, as parallelism cannot be leveraged, but it is a workaround.

Thanks to all for the responses!!

Fernando

|||

There seems to be a few issues within the thread:

1) Why is the private field myList nullified?

Whenever serialization takes place (Read/Write), a new instance is instantiated and it is up to your serialization code to fill the instance. Since your Write() only sets myResult, myList will be remain on the default value of null.

2) Why is MaxByteSize not always enforced?

MaxByteSize is enforced during serialization. You could cause instances to become much larger than 8k over Accumulate() calls and then not serialize out 8k.

With this said, there should not be a reason to build UDAggs that become larger than 8k but do not serialize out 8k. Read/write should serialize all necessary information for the UDAggs.

The UDAgg code within this thread is a good example of a contrived case for this. Since Terminate() only returns the count, there is not a need to accumulate the actual strings, but Accumulate() could increment a counter instead.

3) When is Read/Write called?

Within SQL Server 2005, serialization takes place during Merge() and before Terminate() is called. If the UDAgg provided accessed the myList within Terminate(), you would see a NullReferenceException from within Terminate() (since Write() does not reconstruct myList and null is its default reference).

As pointed out within this thread, Read/Write is not called for every row. However, please write your UDAgg with proper serialization semantics as if it were called every row.

Hope that helps!

-Jason

|||

Jason,

Thanks for all your explanation. It makes sense to me now how all this work.

But I still have one issue, which is the limitation of 8K. The list that I am building cannot be contrived until it is complete just before "Terminate". In the example I am passing the count and I could do that in the merge also. But in my real case, I need the complete list to be used in the resolution of a non-linear equation, so I cannot do anything with it until I pass it to a iterative algorithm in order to solve the equation. But this list can grow bigger than 8K.

The only way to avoid so far is to limit the degree of parallelism to 1, so the code doesn't go to merge.

Any thoughts on this?

Thanks,

Fernando

|||

Fernando-

Merge is also called in some more complex queries (such as GROUP BY WITH CUBE).

This is an interesting workaround to the 8k limitation for your scenario. I do worry that your workaround leaves your UDAgg prone to internal changes within SQL Server causes serialization to be more frequently performed or eliminated. For this reason, I'd recommend always using an approach where instances created from Read/Write are no different than the original.

I cannot recommend using this approach for these reasons, but limiting DOP to 1 should eliminate Merge() calls for the most part. If you absolutely must use this workaround, I'd recommend safeguarding against your assumptions (Merge never called, serialization takes place once after all Accumulate calls but before Terminate):

Always throw an exception within your Merge() call so that in the case that Merge() is invoked, you'll know what the issue is and you can try to simplify the query so that Merge is not called.|||

Jason,

Thanks for all your help and explanations.

Fernando

No comments:

Post a Comment